http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java deleted file mode 100644 index 4e3b932..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl.testutils; - -import java.util.ArrayList; -import java.util.List; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; - -/** - * @author unattributed - */ -public class ReflexiveProtocolHandler implements ProtocolHandler { - - private List<ProtocolMessage> messages = new ArrayList<>(); - - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - messages.add(msg); - return msg; - } - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - - public List<ProtocolMessage> getMessages() { - return messages; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-web/.gitignore ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-web/.gitignore b/nar-bundles/framework-bundle/framework/cluster-web/.gitignore deleted file mode 100755 index ea8c4bf..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-web/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-web/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-web/pom.xml b/nar-bundles/framework-bundle/framework/cluster-web/pom.xml deleted file mode 100644 index a7c39c6..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-web/pom.xml +++ /dev/null @@ -1,50 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-parent</artifactId> - <version>0.0.1-SNAPSHOT</version> - </parent> - <artifactId>framework-cluster-web</artifactId> - <packaging>jar</packaging> - <name>NiFi Framework Cluster Web</name> - <description>The clustering software for communicating with the NiFi web api.</description> - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-properties</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>web-optimistic-locking</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-administration</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-user-actions</artifactId> - </dependency> - - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java deleted file mode 100644 index 44fb25a..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.context; - -import java.io.Serializable; -import java.util.List; -import org.apache.nifi.action.Action; -import org.apache.nifi.web.Revision; - -/** - * Contains contextual information about clustering that may be serialized - * between manager and node when communicating over HTTP. - */ -public interface ClusterContext extends Serializable { - - /** - * Returns a list of auditable actions. The list is modifiable - * and will never be null. - * @return a collection of actions - */ - List<Action> getActions(); - - Revision getRevision(); - - void setRevision(Revision revision); - - /** - * @return true if the request was sent by the cluster manager; false otherwise - */ - boolean isRequestSentByClusterManager(); - - /** - * Sets the flag to indicate if a request was sent by the cluster manager. - * @param flag true if the request was sent by the cluster manager; false otherwise - */ - void setRequestSentByClusterManager(boolean flag); - - /** - * Gets an id generation seed. This is used to ensure that nodes are able to generate the - * same id across the cluster. This is usually handled by the cluster manager creating the - * id, however for some actions (snippets, templates, etc) this is not possible. - * @return - */ - String getIdGenerationSeed(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java deleted file mode 100644 index 06907d2..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.context; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import org.apache.nifi.action.Action; -import org.apache.nifi.web.Revision; - -/** - * A basic implementation of the context. - */ -public class ClusterContextImpl implements ClusterContext, Serializable { - - private final List<Action> actions = new ArrayList<>(); - - private Revision revision; - - private boolean requestSentByClusterManager; - - private final String idGenerationSeed = UUID.randomUUID().toString(); - - @Override - public List<Action> getActions() { - return actions; - } - - @Override - public Revision getRevision() { - return revision; - } - - @Override - public void setRevision(Revision revision) { - this.revision = revision; - } - - @Override - public boolean isRequestSentByClusterManager() { - return requestSentByClusterManager; - } - - @Override - public void setRequestSentByClusterManager(boolean requestSentByClusterManager) { - this.requestSentByClusterManager = requestSentByClusterManager; - } - - @Override - public String getIdGenerationSeed() { - return this.idGenerationSeed; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java deleted file mode 100644 index 012e7c7..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.context; - -/** - * Manages a cluster context on a threadlocal. - */ -public class ClusterContextThreadLocal { - - private static final ThreadLocal<ClusterContext> contextHolder = new ThreadLocal<>(); - - public static void removeContext() { - contextHolder.remove(); - } - - public static ClusterContext createEmptyContext() { - return new ClusterContextImpl(); - } - - public static ClusterContext getContext() { - ClusterContext ctx = contextHolder.get(); - if(ctx == null) { - ctx = createEmptyContext(); - contextHolder.set(ctx); - } - return ctx; - } - - public static void setContext(final ClusterContext context) { - contextHolder.set(context); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java b/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java deleted file mode 100644 index 90b8a37..0000000 --- a/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.web; - -import org.apache.nifi.cluster.context.ClusterContext; -import org.apache.nifi.cluster.context.ClusterContextThreadLocal; - -/** - * An optimistic locking manager that provides for optimistic locking in a clustered - * environment. - * - * @author unattributed - */ -public class ClusterAwareOptimisticLockingManager implements OptimisticLockingManager { - - private final OptimisticLockingManager optimisticLockingManager; - - public ClusterAwareOptimisticLockingManager(final OptimisticLockingManager optimisticLockingManager) { - this.optimisticLockingManager = optimisticLockingManager; - } - - @Override - public Revision checkRevision(Revision revision) throws InvalidRevisionException { - final Revision currentRevision = getRevision(); - if(currentRevision.equals(revision) == false) { - throw new InvalidRevisionException(String.format("Given revision %s does not match current revision %s.", revision, currentRevision)); - } else { - return revision.increment(revision.getClientId()); - } - } - - @Override - public boolean isCurrent(Revision revision) { - return getRevision().equals(revision); - } - - @Override - public Revision getRevision() { - final ClusterContext ctx = ClusterContextThreadLocal.getContext(); - if(ctx == null || ctx.getRevision() == null) { - return optimisticLockingManager.getRevision(); - } else { - return ctx.getRevision(); - } - } - - @Override - public void setRevision(final Revision revision) { - final ClusterContext ctx = ClusterContextThreadLocal.getContext(); - if(ctx != null) { - ctx.setRevision(revision); - } - optimisticLockingManager.setRevision(revision); - } - - @Override - public Revision incrementRevision() { - final Revision currentRevision = getRevision(); - final Revision incRevision = currentRevision.increment(); - setRevision(incRevision); - return incRevision; - } - - @Override - public Revision incrementRevision(final String clientId) { - final Revision currentRevision = getRevision(); - final Revision incRevision = currentRevision.increment(clientId); - setRevision(incRevision); - return incRevision; - } - - @Override - public String getLastModifier() { - return optimisticLockingManager.getLastModifier(); - } - - @Override - public void setLastModifier(final String lastModifier) { - optimisticLockingManager.setLastModifier(lastModifier); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/.gitignore ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/.gitignore b/nar-bundles/framework-bundle/framework/cluster/.gitignore deleted file mode 100755 index ea8c4bf..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/pom.xml b/nar-bundles/framework-bundle/framework/cluster/pom.xml deleted file mode 100644 index 6712802..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/pom.xml +++ /dev/null @@ -1,132 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-parent</artifactId> - <version>0.0.1-SNAPSHOT</version> - </parent> - <artifactId>framework-cluster</artifactId> - <packaging>jar</packaging> - <name>NiFi Framework Cluster</name> - <description>The clustering software for NiFi.</description> - <dependencies> - - <!-- application core dependencies --> - - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-properties</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-logging-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>client-dto</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>framework-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>core-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>framework-cluster-protocol</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>framework-cluster-web</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-web-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-administration</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>site-to-site</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - </dependency> - - <!-- third party dependencies --> - - <!-- sun dependencies --> - <dependency> - <groupId>javax.servlet</groupId> - <artifactId>javax.servlet-api</artifactId> - </dependency> - - <!-- commons dependencies --> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> - <dependency> - <groupId>commons-net</groupId> - <artifactId>commons-net</artifactId> - </dependency> - - <!-- jersey dependencies --> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-client</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-server</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-json</artifactId> - </dependency> - - <!-- spring dependencies --> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-core</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-beans</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-context</artifactId> - </dependency> - - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java deleted file mode 100644 index 0b70c61..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.client; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Locale; -import java.util.Timer; -import java.util.TimerTask; - -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.impl.MulticastProtocolListener; -import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; -import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; -import org.apache.nifi.cluster.protocol.message.PingMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.apache.nifi.io.socket.multicast.MulticastUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Simple multicast test client that sends ping messages to a group address. - */ -public class MulticastTestClient { - - private static final Logger logger = LoggerFactory.getLogger(MulticastTestClient.class); - - private static final int PING_DELAY_SECONDS = 3; - - public static void main(final String... args) throws IOException { - - String group = System.getProperty("group", "225.0.0.0"); - if (group == null) { - System.out.println("Host system property 'group' was not given."); - return; - } - group = group.trim(); - if (group.length() == 0) { - System.out.println("Host system property 'group' must be non-empty."); - return; - } - - final String portStr = System.getProperty("port", "2222"); - final int port; - try { - port = Integer.parseInt(portStr); - } catch (final NumberFormatException nfe) { - System.out.println("Port system property 'port' was not a valid port."); - return; - } - - logger.info(String.format("Pinging every %s seconds using multicast address: %s:%s.", PING_DELAY_SECONDS, group, port)); - logger.info("Override defaults by using system properties '-Dgroup=<Class D IP>' and '-Dport=<unused port>'."); - logger.info("The test client may be stopped by entering a newline at the command line."); - - final InetSocketAddress addr = new InetSocketAddress(group, port); - final ProtocolContext<ProtocolMessage> protocolContext = new JaxbProtocolContext<ProtocolMessage>(JaxbProtocolUtils.JAXB_CONTEXT); - final MulticastConfiguration multicastConfig = new MulticastConfiguration(); - multicastConfig.setReuseAddress(true); - - // setup listener - final MulticastProtocolListener listener = new MulticastProtocolListener(1, addr, multicastConfig, protocolContext); - listener.addHandler(new ProtocolHandler() { - @Override - public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { - final PingMessage pingMsg = (PingMessage) msg; - final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss", Locale.US); - logger.info("Pinged at: " + sdf.format(pingMsg.getDate())); - return null; - } - - @Override - public boolean canHandle(ProtocolMessage msg) { - return true; - } - }); - - // setup socket - final MulticastSocket multicastSocket = MulticastUtils.createMulticastSocket(multicastConfig); - - // setup broadcaster - final Timer broadcaster = new Timer("Multicast Test Client", /** - * is daemon * - */ - true); - - try { - - // start listening - listener.start(); - - // start broadcasting - broadcaster.schedule(new TimerTask() { - - @Override - public void run() { - try { - - final PingMessage msg = new PingMessage(); - msg.setDate(new Date()); - - // marshal message to output stream - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - marshaller.marshal(msg, baos); - final byte[] packetBytes = baos.toByteArray(); - - // send message - final DatagramPacket packet = new DatagramPacket(packetBytes, packetBytes.length, addr); - multicastSocket.send(packet); - - } catch (final Exception ex) { - logger.warn("Failed to send message due to: " + ex, ex); - } - } - }, 0, PING_DELAY_SECONDS * 1000); - - // block until any input is received - System.in.read(); - - } finally { - broadcaster.cancel(); - if (listener.isRunning()) { - listener.stop(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java deleted file mode 100644 index 6bc5d6c..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.event; - -import java.util.Date; -import org.apache.commons.lang3.StringUtils; - -/** - * Events describe the occurrence of something noteworthy. They record the - * event's source, a timestamp, a description, and a category. - * - * @author unattributed - * - * @Immutable - */ -public class Event { - - public static enum Category { - - DEBUG, - INFO, - WARN - } - - private final String source; - - private final long timestamp; - - private final Category category; - - private final String message; - - /** - * Creates an event with the current time as the timestamp and a category of - * "INFO". - * - * @param source the source - * @param message the description - */ - public Event(final String source, final String message) { - this(source, message, Category.INFO); - } - - /** - * Creates an event with the current time as the timestamp. - * - * @param source the source - * @param message the description - * @param category the event category - */ - public Event(final String source, final String message, final Category category) { - this(source, message, category, new Date().getTime()); - } - - /** - * Creates an event with the a category of "INFO". - * - * @param source the source - * @param message the description - * @param timestamp the time of occurrence - */ - public Event(final String source, final String message, final long timestamp) { - this(source, message, Category.INFO, timestamp); - } - - /** - * Creates an event. - * - * @param source the source - * @param message the description - * @param category the event category - * @param timestamp the time of occurrence - */ - public Event(final String source, final String message, final Category category, final long timestamp) { - - if (StringUtils.isBlank(source)) { - throw new IllegalArgumentException("Source may not be empty or null."); - } else if (StringUtils.isBlank(message)) { - throw new IllegalArgumentException("Event message may not be empty or null."); - } else if (category == null) { - throw new IllegalArgumentException("Event category may not be null."); - } else if (timestamp < 0) { - throw new IllegalArgumentException("Timestamp may not be negative: " + timestamp); - } - - this.source = source; - this.message = message; - this.category = category; - this.timestamp = timestamp; - } - - public Category getCategory() { - return category; - } - - public String getMessage() { - return message; - } - - public String getSource() { - return source; - } - - public long getTimestamp() { - return timestamp; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java deleted file mode 100644 index f9dfb00..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.event; - -import java.util.List; - -/** - * Manages an ordered list of events. The event history size dictates the total - * number of events to manage for a given source at a given time. When the size - * is exceeded, the oldest event for that source is evicted. - * - * @author unattributed - */ -public interface EventManager { - - /** - * Adds an event to the manager. - * - * @param event an Event - */ - void addEvent(Event event); - - /** - * Returns a list of events for a given source sorted by the event's - * timestamp where the most recent event is first in the list. - * - * @param eventSource the source - * - * @return the list of events - */ - List<Event> getEvents(String eventSource); - - /* - * Returns the most recent event for the source. If no events exist, then - * null is returned. - */ - Event getMostRecentEvent(String eventSource); - - /* - * Clears all events for the given source. - */ - void clearEventHistory(String eventSource); - - /** - * Returns the history size. - * - * @return the history size - */ - int getEventHistorySize(); - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java deleted file mode 100644 index 7fadc78..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.event.impl; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Queue; -import org.apache.nifi.cluster.event.Event; -import org.apache.nifi.cluster.event.EventManager; - -/** - * Implements the EventManager. - * - * @author unattributed - */ -public class EventManagerImpl implements EventManager { - - /** - * associates the source ID with an ordered queue of events, ordered by most - * recent event - */ - private final Map<String, Queue<Event>> eventsMap = new HashMap<>(); - - /** - * the number of events to maintain for a given source - */ - private final int eventHistorySize; - - /** - * Creates an instance. - * - * @param eventHistorySize the number of events to manage for a given - * source. Value must be positive. - */ - public EventManagerImpl(final int eventHistorySize) { - if (eventHistorySize <= 0) { - throw new IllegalArgumentException("Event history size must be positive: " + eventHistorySize); - } - this.eventHistorySize = eventHistorySize; - } - - @Override - public void addEvent(final Event event) { - - if (event == null) { - throw new IllegalArgumentException("Event may not be null."); - } - - Queue<Event> events = eventsMap.get(event.getSource()); - if (events == null) { - // no events from this source, so add a new queue to the map - events = new PriorityQueue<>(eventHistorySize, createEventComparator()); - eventsMap.put(event.getSource(), events); - } - - // add event - events.add(event); - - // if we exceeded the history size, then evict the oldest event - if (events.size() > eventHistorySize) { - removeOldestEvent(events); - } - - } - - @Override - public List<Event> getEvents(final String eventSource) { - final Queue<Event> events = eventsMap.get(eventSource); - if (events == null) { - return Collections.EMPTY_LIST; - } else { - return Collections.unmodifiableList(new ArrayList<>(events)); - } - } - - @Override - public int getEventHistorySize() { - return eventHistorySize; - } - - @Override - public Event getMostRecentEvent(final String eventSource) { - final Queue<Event> events = eventsMap.get(eventSource); - if (events == null) { - return null; - } else { - return events.peek(); - } - } - - @Override - public void clearEventHistory(final String eventSource) { - eventsMap.remove(eventSource); - } - - private Comparator createEventComparator() { - return new Comparator<Event>() { - @Override - public int compare(final Event o1, final Event o2) { - // orders events by most recent first - return (int) (o2.getTimestamp() - o1.getTimestamp()); - } - }; - } - - private void removeOldestEvent(final Collection<Event> events) { - - if (events.isEmpty()) { - return; - } - - Event oldestEvent = null; - for (final Event event : events) { - if (oldestEvent == null || oldestEvent.getTimestamp() > event.getTimestamp()) { - oldestEvent = event; - } - } - - events.remove(oldestEvent); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java deleted file mode 100644 index 2e3d278..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.firewall; - -/** - * Defines the interface for restricting external client connections to a set of - * hosts or IPs. - */ -public interface ClusterNodeFirewall { - - /** - * Returns true if the given host or IP is permissible through the firewall; - * false otherwise. - * - * If an IP is given, then it must be formatted in dotted decimal notation. - * @param hostOrIp - * @return - */ - boolean isPermissible(String hostOrIp); - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java deleted file mode 100644 index 916ec14..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.firewall.impl; - -import java.io.*; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.*; -import org.apache.commons.net.util.SubnetUtils; -import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; -import org.apache.nifi.util.file.FileUtils; -import org.apache.nifi.logging.NiFiLog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A file-based implementation of the ClusterFirewall interface. The class is - * configured with a file. If the file is empty, then everything is permissible. - * Otherwise, the file should contain hostnames or IPs formatted as dotted - * decimals with an optional CIDR suffix. Each entry must be separated by a - * newline. An example configuration is given below: - * - * <code> - * # hash character is a comment delimiter - * 1.2.3.4 # exact IP - * some.host.name # a host name - * 4.5.6.7/8 # range of CIDR IPs - * 9.10.11.12/13 # a smaller range of CIDR IPs - * </code> - * - * This class allows for synchronization with an optionally configured restore - * directory. If configured, then at startup, if the either the config file or - * the restore directory's copy is missing, then the configuration file will be - * copied to the appropriate location. If both restore directory contains a copy - * that is different in content to configuration file, then an exception is - * thrown at construction time. - */ -public class FileBasedClusterNodeFirewall implements ClusterNodeFirewall { - - private final File config; - - private final File restoreDirectory; - - private final Collection<SubnetUtils.SubnetInfo> subnetInfos = new ArrayList<>(); - - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(FileBasedClusterNodeFirewall.class)); - - public FileBasedClusterNodeFirewall(final File config) throws IOException { - this(config, null); - } - - public FileBasedClusterNodeFirewall(final File config, final File restoreDirectory) throws IOException { - - if (config == null) { - throw new IllegalArgumentException("Firewall configuration file may not be null."); - } - - this.config = config; - this.restoreDirectory = restoreDirectory; - - if (restoreDirectory != null) { - // synchronize with restore directory - try { - syncWithRestoreDirectory(); - } catch (final IOException ioe) { - throw new RuntimeException(ioe); - } - } - - if (!config.exists() && !config.createNewFile()) { - throw new IOException("Firewall configuration file did not exist and could not be created: " + config.getAbsolutePath()); - } - - logger.info("Loading cluster firewall configuration."); - parseConfig(config); - logger.info("Cluster firewall configuration loaded."); - } - - @Override - public boolean isPermissible(final String hostOrIp) { - try { - - // if no rules, then permit everything - if (subnetInfos.isEmpty()) { - return true; - } - - final String ip; - try { - ip = InetAddress.getByName(hostOrIp).getHostAddress(); - } catch (final UnknownHostException uhe) { - logger.warn("Blocking unknown host: " + hostOrIp, uhe); - return false; - } - - // check each subnet to see if IP is in range - for (final SubnetUtils.SubnetInfo subnetInfo : subnetInfos) { - if (subnetInfo.isInRange(ip)) { - return true; - } - } - - // no match - return false; - - } catch (final IllegalArgumentException iae) { - return false; - } - } - - private void syncWithRestoreDirectory() throws IOException { - - // sanity check that restore directory is a directory, creating it if necessary - FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory); - - // check that restore directory is not the same as the primary directory - if (config.getParentFile().getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) { - throw new IllegalStateException( - String.format("Cluster firewall configuration file '%s' cannot be in the restore directory '%s' ", - config.getAbsolutePath(), restoreDirectory.getAbsolutePath())); - } - - // the restore copy will have same file name, but reside in a different directory - final File restoreFile = new File(restoreDirectory, config.getName()); - - // sync the primary copy with the restore copy - FileUtils.syncWithRestore(config, restoreFile, logger); - - } - - private void parseConfig(final File config) throws IOException { - - // clear old information - subnetInfos.clear(); - try (BufferedReader br = new BufferedReader(new FileReader(config))) { - - String ipOrHostLine; - String ipCidr; - int totalIpsAdded = 0; - while ((ipOrHostLine = br.readLine()) != null) { - - // cleanup whitespace - ipOrHostLine = ipOrHostLine.trim(); - - if (ipOrHostLine.isEmpty() || ipOrHostLine.startsWith("#")) { - // skip empty lines or comments - continue; - } else if (ipOrHostLine.contains("#")) { - // parse out comments in IP containing lines - ipOrHostLine = ipOrHostLine.substring(0, ipOrHostLine.indexOf("#")).trim(); - } - - // if given a complete IP, then covert to CIDR - if (ipOrHostLine.contains("/")) { - ipCidr = ipOrHostLine; - } else if (ipOrHostLine.contains("\\")) { - logger.warn("CIDR IP notation uses forward slashes '/'. Replacing backslash '\\' with forward slash'/' for '" + ipOrHostLine + "'"); - ipCidr = ipOrHostLine.replace("\\", "/"); - } else { - try { - ipCidr = InetAddress.getByName(ipOrHostLine).getHostAddress(); - if (!ipOrHostLine.equals(ipCidr)) { - logger.debug(String.format("Resolved host '%s' to ip '%s'", ipOrHostLine, ipCidr)); - } - ipCidr += "/32"; - logger.debug("Adding CIDR to exact IP: " + ipCidr); - } catch (final UnknownHostException uhe) { - logger.warn("Firewall is skipping unknown host address: " + ipOrHostLine); - continue; - } - } - - try { - logger.debug("Adding CIDR IP to firewall: " + ipCidr); - final SubnetUtils subnetUtils = new SubnetUtils(ipCidr); - subnetUtils.setInclusiveHostCount(true); - subnetInfos.add(subnetUtils.getInfo()); - totalIpsAdded++; - } catch (final IllegalArgumentException iae) { - logger.warn("Firewall is skipping invalid CIDR address: " + ipOrHostLine); - } - - } - - if (totalIpsAdded == 0) { - logger.info("No IPs added to firewall. Firewall will accept all requests."); - } else { - logger.info(String.format("Added %d IP(s) to firewall. Only requests originating from the configured IPs will be accepted.", totalIpsAdded)); - } - - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java deleted file mode 100644 index eedb88f..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow; - -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.StandardDataFlow; - -/** - * A dataflow with additional information about the cluster. - * - * @author unattributed - */ -public class ClusterDataFlow { - - private final StandardDataFlow dataFlow; - - private final NodeIdentifier primaryNodeId; - - public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId) { - this.dataFlow = dataFlow; - this.primaryNodeId = primaryNodeId; - } - - public NodeIdentifier getPrimaryNodeId() { - return primaryNodeId; - } - - public StandardDataFlow getDataFlow() { - return dataFlow; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java deleted file mode 100644 index 6ff15a7..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow; - -/** - * A base exception for data access exceptions. - * - * @author unattributed - */ -public class DaoException extends RuntimeException { - - public DaoException() { - } - - public DaoException(String msg) { - super(msg); - } - - public DaoException(Throwable cause) { - super(cause); - } - - public DaoException(String msg, Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java deleted file mode 100644 index a273704..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow; - -/** - * A data access object for loading and saving the flow managed by the cluster. - * - * @author unattributed - */ -public interface DataFlowDao { - - /** - * Loads the cluster's dataflow. - * - * @return the dataflow or null if no dataflow exists - * - * @throws DaoException if the dataflow was unable to be loaded - */ - ClusterDataFlow loadDataFlow() throws DaoException; - - /** - * Saves the cluster's dataflow. - * - * - * @param dataFlow - * @throws DaoException if the dataflow was unable to be saved - */ - void saveDataFlow(ClusterDataFlow dataFlow) throws DaoException; - - /** - * Sets the state of the dataflow. If the dataflow does not exist, then an - * exception is thrown. - * - * @param flowState the state of the dataflow - * - * @throws DaoException if the state was unable to be updated - */ - void setPersistedFlowState(PersistedFlowState flowState) throws DaoException; - - /** - * Gets the state of the dataflow. - * - * @return the state of the dataflow - * - * @throws DaoException if the state was unable to be retrieved - */ - PersistedFlowState getPersistedFlowState() throws DaoException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java deleted file mode 100644 index 339d904..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow; - -import java.util.Set; -import org.apache.nifi.cluster.protocol.NodeIdentifier; - -/** - * A service for managing the cluster's flow. The service will attempt to keep - * the cluster's dataflow current while respecting the value of the configured - * retrieval delay. - * - * The eligible retrieval time is reset with the configured delay every time the - * flow state is set to STALE. If the state is set to UNKNOWN or CURRENT, then - * the flow will not be retrieved. - * - * Clients must call start() and stop() to initialize and stop the instance. - * - * @author unattributed - */ -public interface DataFlowManagementService { - - /** - * Starts the instance. Start may only be called if the instance is not - * running. - */ - void start(); - - /** - * Stops the instance. Stop may only be called if the instance is running. - */ - void stop(); - - /** - * @return true if the instance is started; false otherwise. - */ - boolean isRunning(); - - /** - * Loads the dataflow. - * - * @return the dataflow or null if no dataflow exists - */ - ClusterDataFlow loadDataFlow(); - - /** - * Updates the dataflow with the given primary node identifier. - * - * @param nodeId the node identifier - * - * @throws DaoException if the update failed - */ - void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException; - - /** - * Sets the state of the flow. - * - * @param flowState the state - * - * @see PersistedFlowState - */ - void setPersistedFlowState(PersistedFlowState flowState); - - /** - * @return the state of the flow - */ - PersistedFlowState getPersistedFlowState(); - - /** - * @return true if the flow is current; false otherwise. - */ - boolean isFlowCurrent(); - - /** - * Sets the node identifiers to use when attempting to retrieve the flow. - * - * @param nodeIds the node identifiers - */ - void setNodeIds(Set<NodeIdentifier> nodeIds); - - /** - * Returns the set of node identifiers the service is using to retrieve the - * flow. - * - * @return the set of node identifiers the service is using to retrieve the - * flow. - */ - Set<NodeIdentifier> getNodeIds(); - - /** - * @return the retrieval delay in seconds - */ - int getRetrievalDelaySeconds(); - - /** - * Sets the retrieval delay. - * - * @param delay the retrieval delay in seconds - */ - void setRetrievalDelay(String delay); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java deleted file mode 100644 index b3afc6e..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/PersistedFlowState.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow; - -/** - * Represents the various state of a flow managed by the cluster. - * - * The semantics of the values are: - * <ul> - * <li> CURRENT - the flow is current </li> - * <li> STALE - the flow is not current, but is eligible to be updated. </li> - * <li> UNKNOWN - the flow is not current and is not eligible to be updated. - * </li> - * </ul> - * - * @author unattributed - */ -public enum PersistedFlowState { - - CURRENT, - STALE, - UNKNOWN -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java deleted file mode 100644 index ce5a08b..0000000 --- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/StaleFlowException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.flow; - -/** - * Represents the exceptional case when a caller is requesting the current flow, - * but a current flow is not available. - * - * @author unattributed - */ -public class StaleFlowException extends RuntimeException { - - public StaleFlowException(String message, Throwable cause) { - super(message, cause); - } - - public StaleFlowException(String message) { - super(message); - } - - public StaleFlowException(Throwable cause) { - super(cause); - } - - public StaleFlowException() { - } - -}