http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java new file mode 100644 index 0000000..4e3b932 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl.testutils; + +import java.util.ArrayList; +import java.util.List; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; + +/** + * @author unattributed + */ +public class ReflexiveProtocolHandler implements ProtocolHandler { + + private List<ProtocolMessage> messages = new ArrayList<>(); + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + messages.add(msg); + return msg; + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + public List<ProtocolMessage> getMessages() { + return messages; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-web/.gitignore ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-web/.gitignore b/nar-bundles/framework-bundle/framework/cluster-web/.gitignore new file mode 100755 index 0000000..ea8c4bf --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-web/.gitignore @@ -0,0 +1 @@ +/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-web/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-web/pom.xml b/nar-bundles/framework-bundle/framework/cluster-web/pom.xml new file mode 100644 index 0000000..a7c39c6 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-web/pom.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <artifactId>framework-cluster-web</artifactId> + <packaging>jar</packaging> + <name>NiFi Framework Cluster Web</name> + <description>The clustering software for communicating with the NiFi web api.</description> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-properties</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>web-optimistic-locking</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-administration</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-user-actions</artifactId> + </dependency> + + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java new file mode 100644 index 0000000..44fb25a --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.context; + +import java.io.Serializable; +import java.util.List; +import org.apache.nifi.action.Action; +import org.apache.nifi.web.Revision; + +/** + * Contains contextual information about clustering that may be serialized + * between manager and node when communicating over HTTP. + */ +public interface ClusterContext extends Serializable { + + /** + * Returns a list of auditable actions. The list is modifiable + * and will never be null. + * @return a collection of actions + */ + List<Action> getActions(); + + Revision getRevision(); + + void setRevision(Revision revision); + + /** + * @return true if the request was sent by the cluster manager; false otherwise + */ + boolean isRequestSentByClusterManager(); + + /** + * Sets the flag to indicate if a request was sent by the cluster manager. + * @param flag true if the request was sent by the cluster manager; false otherwise + */ + void setRequestSentByClusterManager(boolean flag); + + /** + * Gets an id generation seed. This is used to ensure that nodes are able to generate the + * same id across the cluster. This is usually handled by the cluster manager creating the + * id, however for some actions (snippets, templates, etc) this is not possible. + * @return + */ + String getIdGenerationSeed(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java new file mode 100644 index 0000000..06907d2 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.context; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import org.apache.nifi.action.Action; +import org.apache.nifi.web.Revision; + +/** + * A basic implementation of the context. + */ +public class ClusterContextImpl implements ClusterContext, Serializable { + + private final List<Action> actions = new ArrayList<>(); + + private Revision revision; + + private boolean requestSentByClusterManager; + + private final String idGenerationSeed = UUID.randomUUID().toString(); + + @Override + public List<Action> getActions() { + return actions; + } + + @Override + public Revision getRevision() { + return revision; + } + + @Override + public void setRevision(Revision revision) { + this.revision = revision; + } + + @Override + public boolean isRequestSentByClusterManager() { + return requestSentByClusterManager; + } + + @Override + public void setRequestSentByClusterManager(boolean requestSentByClusterManager) { + this.requestSentByClusterManager = requestSentByClusterManager; + } + + @Override + public String getIdGenerationSeed() { + return this.idGenerationSeed; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java new file mode 100644 index 0000000..012e7c7 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.context; + +/** + * Manages a cluster context on a threadlocal. + */ +public class ClusterContextThreadLocal { + + private static final ThreadLocal<ClusterContext> contextHolder = new ThreadLocal<>(); + + public static void removeContext() { + contextHolder.remove(); + } + + public static ClusterContext createEmptyContext() { + return new ClusterContextImpl(); + } + + public static ClusterContext getContext() { + ClusterContext ctx = contextHolder.get(); + if(ctx == null) { + ctx = createEmptyContext(); + contextHolder.set(ctx); + } + return ctx; + } + + public static void setContext(final ClusterContext context) { + contextHolder.set(context); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java new file mode 100644 index 0000000..90b8a37 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web; + +import org.apache.nifi.cluster.context.ClusterContext; +import org.apache.nifi.cluster.context.ClusterContextThreadLocal; + +/** + * An optimistic locking manager that provides for optimistic locking in a clustered + * environment. + * + * @author unattributed + */ +public class ClusterAwareOptimisticLockingManager implements OptimisticLockingManager { + + private final OptimisticLockingManager optimisticLockingManager; + + public ClusterAwareOptimisticLockingManager(final OptimisticLockingManager optimisticLockingManager) { + this.optimisticLockingManager = optimisticLockingManager; + } + + @Override + public Revision checkRevision(Revision revision) throws InvalidRevisionException { + final Revision currentRevision = getRevision(); + if(currentRevision.equals(revision) == false) { + throw new InvalidRevisionException(String.format("Given revision %s does not match current revision %s.", revision, currentRevision)); + } else { + return revision.increment(revision.getClientId()); + } + } + + @Override + public boolean isCurrent(Revision revision) { + return getRevision().equals(revision); + } + + @Override + public Revision getRevision() { + final ClusterContext ctx = ClusterContextThreadLocal.getContext(); + if(ctx == null || ctx.getRevision() == null) { + return optimisticLockingManager.getRevision(); + } else { + return ctx.getRevision(); + } + } + + @Override + public void setRevision(final Revision revision) { + final ClusterContext ctx = ClusterContextThreadLocal.getContext(); + if(ctx != null) { + ctx.setRevision(revision); + } + optimisticLockingManager.setRevision(revision); + } + + @Override + public Revision incrementRevision() { + final Revision currentRevision = getRevision(); + final Revision incRevision = currentRevision.increment(); + setRevision(incRevision); + return incRevision; + } + + @Override + public Revision incrementRevision(final String clientId) { + final Revision currentRevision = getRevision(); + final Revision incRevision = currentRevision.increment(clientId); + setRevision(incRevision); + return incRevision; + } + + @Override + public String getLastModifier() { + return optimisticLockingManager.getLastModifier(); + } + + @Override + public void setLastModifier(final String lastModifier) { + optimisticLockingManager.setLastModifier(lastModifier); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/.gitignore ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/.gitignore b/nar-bundles/framework-bundle/framework/cluster/.gitignore new file mode 100755 index 0000000..ea8c4bf --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/.gitignore @@ -0,0 +1 @@ +/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/pom.xml b/nar-bundles/framework-bundle/framework/cluster/pom.xml new file mode 100644 index 0000000..ad5dda7 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/pom.xml @@ -0,0 +1,133 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <artifactId>framework-cluster</artifactId> + <packaging>jar</packaging> + <name>NiFi Framework Cluster</name> + <description>The clustering software for NiFi.</description> + <dependencies> + + <!-- application core dependencies --> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-properties</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-logging-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-file-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>client-dto</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>framework-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>core-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>framework-cluster-protocol</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>framework-cluster-web</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-web-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-administration</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>site-to-site</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <version>1.9</version> + </dependency> + + <!-- third party dependencies --> + + <!-- sun dependencies --> + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + </dependency> + + <!-- commons dependencies --> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>commons-net</groupId> + <artifactId>commons-net</artifactId> + </dependency> + + <!-- jersey dependencies --> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </dependency> + + <!-- spring dependencies --> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-core</artifactId> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-beans</artifactId> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + </dependency> + + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java new file mode 100644 index 0000000..183c7ca --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.client; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Timer; +import java.util.TimerTask; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.impl.MulticastProtocolListener; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.io.socket.multicast.MulticastConfiguration; +import org.apache.nifi.io.socket.multicast.MulticastUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple multicast test client that sends ping messages to a group address. + */ +public class MulticastTestClient { + + private static final Logger logger = LoggerFactory.getLogger(MulticastTestClient.class); + + private static final int PING_DELAY_SECONDS = 3; + + public static void main(final String... args) throws IOException { + + String group = System.getProperty("group", "225.0.0.0"); + if (group == null) { + System.out.println("Host system property 'group' was not given."); + return; + } + group = group.trim(); + if (group.length() == 0) { + System.out.println("Host system property 'group' must be non-empty."); + return; + } + + final String portStr = System.getProperty("port", "2222"); + final int port; + try { + port = Integer.parseInt(portStr); + } catch (final NumberFormatException nfe) { + System.out.println("Port system property 'port' was not a valid port."); + return; + } + + logger.info(String.format("Pinging every %s seconds using multicast address: %s:%s.", PING_DELAY_SECONDS, group, port)); + logger.info("Override defaults by using system properties '-Dgroup=<Class D IP>' and '-Dport=<unused port>'."); + logger.info("The test client may be stopped by entering a newline at the command line."); + + final InetSocketAddress addr = new InetSocketAddress(group, port); + final ProtocolContext<ProtocolMessage> protocolContext = new JaxbProtocolContext<ProtocolMessage>(JaxbProtocolUtils.JAXB_CONTEXT); + final MulticastConfiguration multicastConfig = new MulticastConfiguration(); + multicastConfig.setReuseAddress(true); + + // setup listener + final MulticastProtocolListener listener = new MulticastProtocolListener(1, addr, multicastConfig, protocolContext); + listener.addHandler(new ProtocolHandler() { + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + final PingMessage pingMsg = (PingMessage) msg; + final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); + logger.info("Pinged at: " + sdf.format(pingMsg.getDate())); + return null; + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + }); + + // setup socket + final MulticastSocket multicastSocket = MulticastUtils.createMulticastSocket(multicastConfig); + + // setup broadcaster + final Timer broadcaster = new Timer("Multicast Test Client", /** + * is daemon * + */ + true); + + try { + + // start listening + listener.start(); + + // start broadcasting + broadcaster.schedule(new TimerTask() { + + @Override + public void run() { + try { + + final PingMessage msg = new PingMessage(); + msg.setDate(new Date()); + + // marshal message to output stream + final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + marshaller.marshal(msg, baos); + final byte[] packetBytes = baos.toByteArray(); + + // send message + final DatagramPacket packet = new DatagramPacket(packetBytes, packetBytes.length, addr); + multicastSocket.send(packet); + + } catch (final Exception ex) { + logger.warn("Failed to send message due to: " + ex, ex); + } + } + }, 0, PING_DELAY_SECONDS * 1000); + + // block until any input is received + System.in.read(); + + } finally { + broadcaster.cancel(); + if (listener.isRunning()) { + listener.stop(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java new file mode 100644 index 0000000..6bc5d6c --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.event; + +import java.util.Date; +import org.apache.commons.lang3.StringUtils; + +/** + * Events describe the occurrence of something noteworthy. They record the + * event's source, a timestamp, a description, and a category. + * + * @author unattributed + * + * @Immutable + */ +public class Event { + + public static enum Category { + + DEBUG, + INFO, + WARN + } + + private final String source; + + private final long timestamp; + + private final Category category; + + private final String message; + + /** + * Creates an event with the current time as the timestamp and a category of + * "INFO". + * + * @param source the source + * @param message the description + */ + public Event(final String source, final String message) { + this(source, message, Category.INFO); + } + + /** + * Creates an event with the current time as the timestamp. + * + * @param source the source + * @param message the description + * @param category the event category + */ + public Event(final String source, final String message, final Category category) { + this(source, message, category, new Date().getTime()); + } + + /** + * Creates an event with the a category of "INFO". + * + * @param source the source + * @param message the description + * @param timestamp the time of occurrence + */ + public Event(final String source, final String message, final long timestamp) { + this(source, message, Category.INFO, timestamp); + } + + /** + * Creates an event. + * + * @param source the source + * @param message the description + * @param category the event category + * @param timestamp the time of occurrence + */ + public Event(final String source, final String message, final Category category, final long timestamp) { + + if (StringUtils.isBlank(source)) { + throw new IllegalArgumentException("Source may not be empty or null."); + } else if (StringUtils.isBlank(message)) { + throw new IllegalArgumentException("Event message may not be empty or null."); + } else if (category == null) { + throw new IllegalArgumentException("Event category may not be null."); + } else if (timestamp < 0) { + throw new IllegalArgumentException("Timestamp may not be negative: " + timestamp); + } + + this.source = source; + this.message = message; + this.category = category; + this.timestamp = timestamp; + } + + public Category getCategory() { + return category; + } + + public String getMessage() { + return message; + } + + public String getSource() { + return source; + } + + public long getTimestamp() { + return timestamp; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java new file mode 100644 index 0000000..f9dfb00 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.event; + +import java.util.List; + +/** + * Manages an ordered list of events. The event history size dictates the total + * number of events to manage for a given source at a given time. When the size + * is exceeded, the oldest event for that source is evicted. + * + * @author unattributed + */ +public interface EventManager { + + /** + * Adds an event to the manager. + * + * @param event an Event + */ + void addEvent(Event event); + + /** + * Returns a list of events for a given source sorted by the event's + * timestamp where the most recent event is first in the list. + * + * @param eventSource the source + * + * @return the list of events + */ + List<Event> getEvents(String eventSource); + + /* + * Returns the most recent event for the source. If no events exist, then + * null is returned. + */ + Event getMostRecentEvent(String eventSource); + + /* + * Clears all events for the given source. + */ + void clearEventHistory(String eventSource); + + /** + * Returns the history size. + * + * @return the history size + */ + int getEventHistorySize(); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java new file mode 100644 index 0000000..7fadc78 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.event.impl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import org.apache.nifi.cluster.event.Event; +import org.apache.nifi.cluster.event.EventManager; + +/** + * Implements the EventManager. + * + * @author unattributed + */ +public class EventManagerImpl implements EventManager { + + /** + * associates the source ID with an ordered queue of events, ordered by most + * recent event + */ + private final Map<String, Queue<Event>> eventsMap = new HashMap<>(); + + /** + * the number of events to maintain for a given source + */ + private final int eventHistorySize; + + /** + * Creates an instance. + * + * @param eventHistorySize the number of events to manage for a given + * source. Value must be positive. + */ + public EventManagerImpl(final int eventHistorySize) { + if (eventHistorySize <= 0) { + throw new IllegalArgumentException("Event history size must be positive: " + eventHistorySize); + } + this.eventHistorySize = eventHistorySize; + } + + @Override + public void addEvent(final Event event) { + + if (event == null) { + throw new IllegalArgumentException("Event may not be null."); + } + + Queue<Event> events = eventsMap.get(event.getSource()); + if (events == null) { + // no events from this source, so add a new queue to the map + events = new PriorityQueue<>(eventHistorySize, createEventComparator()); + eventsMap.put(event.getSource(), events); + } + + // add event + events.add(event); + + // if we exceeded the history size, then evict the oldest event + if (events.size() > eventHistorySize) { + removeOldestEvent(events); + } + + } + + @Override + public List<Event> getEvents(final String eventSource) { + final Queue<Event> events = eventsMap.get(eventSource); + if (events == null) { + return Collections.EMPTY_LIST; + } else { + return Collections.unmodifiableList(new ArrayList<>(events)); + } + } + + @Override + public int getEventHistorySize() { + return eventHistorySize; + } + + @Override + public Event getMostRecentEvent(final String eventSource) { + final Queue<Event> events = eventsMap.get(eventSource); + if (events == null) { + return null; + } else { + return events.peek(); + } + } + + @Override + public void clearEventHistory(final String eventSource) { + eventsMap.remove(eventSource); + } + + private Comparator createEventComparator() { + return new Comparator<Event>() { + @Override + public int compare(final Event o1, final Event o2) { + // orders events by most recent first + return (int) (o2.getTimestamp() - o1.getTimestamp()); + } + }; + } + + private void removeOldestEvent(final Collection<Event> events) { + + if (events.isEmpty()) { + return; + } + + Event oldestEvent = null; + for (final Event event : events) { + if (oldestEvent == null || oldestEvent.getTimestamp() > event.getTimestamp()) { + oldestEvent = event; + } + } + + events.remove(oldestEvent); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java new file mode 100644 index 0000000..2e3d278 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.firewall; + +/** + * Defines the interface for restricting external client connections to a set of + * hosts or IPs. + */ +public interface ClusterNodeFirewall { + + /** + * Returns true if the given host or IP is permissible through the firewall; + * false otherwise. + * + * If an IP is given, then it must be formatted in dotted decimal notation. + * @param hostOrIp + * @return + */ + boolean isPermissible(String hostOrIp); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java new file mode 100644 index 0000000..bcee661 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.firewall.impl; + +import java.io.*; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.*; +import org.apache.commons.net.util.SubnetUtils; +import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; +import org.apache.nifi.file.FileUtils; +import org.apache.nifi.logging.NiFiLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A file-based implementation of the ClusterFirewall interface. The class is + * configured with a file. If the file is empty, then everything is permissible. + * Otherwise, the file should contain hostnames or IPs formatted as dotted + * decimals with an optional CIDR suffix. Each entry must be separated by a + * newline. An example configuration is given below: + * + * <code> + * # hash character is a comment delimiter + * 1.2.3.4 # exact IP + * some.host.name # a host name + * 4.5.6.7/8 # range of CIDR IPs + * 9.10.11.12/13 # a smaller range of CIDR IPs + * </code> + * + * This class allows for synchronization with an optionally configured restore + * directory. If configured, then at startup, if the either the config file or + * the restore directory's copy is missing, then the configuration file will be + * copied to the appropriate location. If both restore directory contains a copy + * that is different in content to configuration file, then an exception is + * thrown at construction time. + */ +public class FileBasedClusterNodeFirewall implements ClusterNodeFirewall { + + private final File config; + + private final File restoreDirectory; + + private final Collection<SubnetUtils.SubnetInfo> subnetInfos = new ArrayList<>(); + + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(FileBasedClusterNodeFirewall.class)); + + public FileBasedClusterNodeFirewall(final File config) throws IOException { + this(config, null); + } + + public FileBasedClusterNodeFirewall(final File config, final File restoreDirectory) throws IOException { + + if (config == null) { + throw new IllegalArgumentException("Firewall configuration file may not be null."); + } + + this.config = config; + this.restoreDirectory = restoreDirectory; + + if (restoreDirectory != null) { + // synchronize with restore directory + try { + syncWithRestoreDirectory(); + } catch (final IOException ioe) { + throw new RuntimeException(ioe); + } + } + + if (!config.exists() && !config.createNewFile()) { + throw new IOException("Firewall configuration file did not exist and could not be created: " + config.getAbsolutePath()); + } + + logger.info("Loading cluster firewall configuration."); + parseConfig(config); + logger.info("Cluster firewall configuration loaded."); + } + + @Override + public boolean isPermissible(final String hostOrIp) { + try { + + // if no rules, then permit everything + if (subnetInfos.isEmpty()) { + return true; + } + + final String ip; + try { + ip = InetAddress.getByName(hostOrIp).getHostAddress(); + } catch (final UnknownHostException uhe) { + logger.warn("Blocking unknown host: " + hostOrIp, uhe); + return false; + } + + // check each subnet to see if IP is in range + for (final SubnetUtils.SubnetInfo subnetInfo : subnetInfos) { + if (subnetInfo.isInRange(ip)) { + return true; + } + } + + // no match + return false; + + } catch (final IllegalArgumentException iae) { + return false; + } + } + + private void syncWithRestoreDirectory() throws IOException { + + // sanity check that restore directory is a directory, creating it if necessary + FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory); + + // check that restore directory is not the same as the primary directory + if (config.getParentFile().getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) { + throw new IllegalStateException( + String.format("Cluster firewall configuration file '%s' cannot be in the restore directory '%s' ", + config.getAbsolutePath(), restoreDirectory.getAbsolutePath())); + } + + // the restore copy will have same file name, but reside in a different directory + final File restoreFile = new File(restoreDirectory, config.getName()); + + // sync the primary copy with the restore copy + FileUtils.syncWithRestore(config, restoreFile, logger); + + } + + private void parseConfig(final File config) throws IOException { + + // clear old information + subnetInfos.clear(); + try (BufferedReader br = new BufferedReader(new FileReader(config))) { + + String ipOrHostLine; + String ipCidr; + int totalIpsAdded = 0; + while ((ipOrHostLine = br.readLine()) != null) { + + // cleanup whitespace + ipOrHostLine = ipOrHostLine.trim(); + + if (ipOrHostLine.isEmpty() || ipOrHostLine.startsWith("#")) { + // skip empty lines or comments + continue; + } else if (ipOrHostLine.contains("#")) { + // parse out comments in IP containing lines + ipOrHostLine = ipOrHostLine.substring(0, ipOrHostLine.indexOf("#")).trim(); + } + + // if given a complete IP, then covert to CIDR + if (ipOrHostLine.contains("/")) { + ipCidr = ipOrHostLine; + } else if (ipOrHostLine.contains("\\")) { + logger.warn("CIDR IP notation uses forward slashes '/'. Replacing backslash '\\' with forward slash'/' for '" + ipOrHostLine + "'"); + ipCidr = ipOrHostLine.replace("\\", "/"); + } else { + try { + ipCidr = InetAddress.getByName(ipOrHostLine).getHostAddress(); + if (!ipOrHostLine.equals(ipCidr)) { + logger.debug(String.format("Resolved host '%s' to ip '%s'", ipOrHostLine, ipCidr)); + } + ipCidr += "/32"; + logger.debug("Adding CIDR to exact IP: " + ipCidr); + } catch (final UnknownHostException uhe) { + logger.warn("Firewall is skipping unknown host address: " + ipOrHostLine); + continue; + } + } + + try { + logger.debug("Adding CIDR IP to firewall: " + ipCidr); + final SubnetUtils subnetUtils = new SubnetUtils(ipCidr); + subnetUtils.setInclusiveHostCount(true); + subnetInfos.add(subnetUtils.getInfo()); + totalIpsAdded++; + } catch (final IllegalArgumentException iae) { + logger.warn("Firewall is skipping invalid CIDR address: " + ipOrHostLine); + } + + } + + if (totalIpsAdded == 0) { + logger.info("No IPs added to firewall. Firewall will accept all requests."); + } else { + logger.info(String.format("Added %d IP(s) to firewall. Only requests originating from the configured IPs will be accepted.", totalIpsAdded)); + } + + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java new file mode 100644 index 0000000..eedb88f --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.flow; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.StandardDataFlow; + +/** + * A dataflow with additional information about the cluster. + * + * @author unattributed + */ +public class ClusterDataFlow { + + private final StandardDataFlow dataFlow; + + private final NodeIdentifier primaryNodeId; + + public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId) { + this.dataFlow = dataFlow; + this.primaryNodeId = primaryNodeId; + } + + public NodeIdentifier getPrimaryNodeId() { + return primaryNodeId; + } + + public StandardDataFlow getDataFlow() { + return dataFlow; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java new file mode 100644 index 0000000..6ff15a7 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.flow; + +/** + * A base exception for data access exceptions. + * + * @author unattributed + */ +public class DaoException extends RuntimeException { + + public DaoException() { + } + + public DaoException(String msg) { + super(msg); + } + + public DaoException(Throwable cause) { + super(cause); + } + + public DaoException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java new file mode 100644 index 0000000..a273704 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.flow; + +/** + * A data access object for loading and saving the flow managed by the cluster. + * + * @author unattributed + */ +public interface DataFlowDao { + + /** + * Loads the cluster's dataflow. + * + * @return the dataflow or null if no dataflow exists + * + * @throws DaoException if the dataflow was unable to be loaded + */ + ClusterDataFlow loadDataFlow() throws DaoException; + + /** + * Saves the cluster's dataflow. + * + * + * @param dataFlow + * @throws DaoException if the dataflow was unable to be saved + */ + void saveDataFlow(ClusterDataFlow dataFlow) throws DaoException; + + /** + * Sets the state of the dataflow. If the dataflow does not exist, then an + * exception is thrown. + * + * @param flowState the state of the dataflow + * + * @throws DaoException if the state was unable to be updated + */ + void setPersistedFlowState(PersistedFlowState flowState) throws DaoException; + + /** + * Gets the state of the dataflow. + * + * @return the state of the dataflow + * + * @throws DaoException if the state was unable to be retrieved + */ + PersistedFlowState getPersistedFlowState() throws DaoException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java new file mode 100644 index 0000000..339d904 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.flow; + +import java.util.Set; +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +/** + * A service for managing the cluster's flow. The service will attempt to keep + * the cluster's dataflow current while respecting the value of the configured + * retrieval delay. + * + * The eligible retrieval time is reset with the configured delay every time the + * flow state is set to STALE. If the state is set to UNKNOWN or CURRENT, then + * the flow will not be retrieved. + * + * Clients must call start() and stop() to initialize and stop the instance. + * + * @author unattributed + */ +public interface DataFlowManagementService { + + /** + * Starts the instance. Start may only be called if the instance is not + * running. + */ + void start(); + + /** + * Stops the instance. Stop may only be called if the instance is running. + */ + void stop(); + + /** + * @return true if the instance is started; false otherwise. + */ + boolean isRunning(); + + /** + * Loads the dataflow. + * + * @return the dataflow or null if no dataflow exists + */ + ClusterDataFlow loadDataFlow(); + + /** + * Updates the dataflow with the given primary node identifier. + * + * @param nodeId the node identifier + * + * @throws DaoException if the update failed + */ + void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException; + + /** + * Sets the state of the flow. + * + * @param flowState the state + * + * @see PersistedFlowState + */ + void setPersistedFlowState(PersistedFlowState flowState); + + /** + * @return the state of the flow + */ + PersistedFlowState getPersistedFlowState(); + + /** + * @return true if the flow is current; false otherwise. + */ + boolean isFlowCurrent(); + + /** + * Sets the node identifiers to use when attempting to retrieve the flow. + * + * @param nodeIds the node identifiers + */ + void setNodeIds(Set<NodeIdentifier> nodeIds); + + /** + * Returns the set of node identifiers the service is using to retrieve the + * flow. + * + * @return the set of node identifiers the service is using to retrieve the + * flow. + */ + Set<NodeIdentifier> getNodeIds(); + + /** + * @return the retrieval delay in seconds + */ + int getRetrievalDelaySeconds(); + + /** + * Sets the retrieval delay. + * + * @param delay the retrieval delay in seconds + */ + void setRetrievalDelay(String delay); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java new file mode 100644 index 0000000..b3afc6e --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.flow; + +/** + * Represents the various state of a flow managed by the cluster. + * + * The semantics of the values are: + * <ul> + * <li> CURRENT - the flow is current </li> + * <li> STALE - the flow is not current, but is eligible to be updated. </li> + * <li> UNKNOWN - the flow is not current and is not eligible to be updated. + * </li> + * </ul> + * + * @author unattributed + */ +public enum PersistedFlowState { + + CURRENT, + STALE, + UNKNOWN +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java new file mode 100644 index 0000000..ce5a08b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.flow; + +/** + * Represents the exceptional case when a caller is requesting the current flow, + * but a current flow is not available. + * + * @author unattributed + */ +public class StaleFlowException extends RuntimeException { + + public StaleFlowException(String message, Throwable cause) { + super(message, cause); + } + + public StaleFlowException(String message) { + super(message); + } + + public StaleFlowException(Throwable cause) { + super(cause); + } + + public StaleFlowException() { + } + +}