http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserGroupEntity.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserGroupEntity.java b/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserGroupEntity.java new file mode 100644 index 0000000..a6542c8 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserGroupEntity.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import javax.xml.bind.annotation.XmlRootElement; +import org.apache.nifi.web.api.dto.UserGroupDTO; + +/** + * A serialized representation of this class can be placed in the entity body of + * a request or response to or from the API. This particular entity holds a + * reference to a UserGroupDTO. + */ +@XmlRootElement(name = "userGroupEntity") +public class UserGroupEntity extends Entity { + + private UserGroupDTO userGroup; + + /** + * The UserGroupDTO that is being serialized. + * + * @return The UserGroupDTO object + */ + public UserGroupDTO getUserGroup() { + return userGroup; + } + + public void setUserGroup(UserGroupDTO userGroup) { + this.userGroup = userGroup; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserSearchResultsEntity.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserSearchResultsEntity.java b/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserSearchResultsEntity.java new file mode 100644 index 0000000..baffe15 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UserSearchResultsEntity.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import java.util.List; +import javax.xml.bind.annotation.XmlRootElement; +import org.apache.nifi.web.api.dto.search.UserGroupSearchResultDTO; +import org.apache.nifi.web.api.dto.search.UserSearchResultDTO; + +/** + * A serialized representation of this class can be placed in the entity body of + * a request or response to or from the API. This particular entity holds a + * reference to UserSearchResultDTOs and UserGroupSearchResultDTOs. + */ +@XmlRootElement(name = "userSearchResultsEntity") +public class UserSearchResultsEntity { + + private List<UserSearchResultDTO> userResults; + private List<UserGroupSearchResultDTO> userGroupResults; + + /** + * The user search results. + * + * @return + */ + public List<UserSearchResultDTO> getUserResults() { + return userResults; + } + + public void setUserResults(List<UserSearchResultDTO> userResults) { + this.userResults = userResults; + } + + /** + * The user group search results. + * + * @return + */ + public List<UserGroupSearchResultDTO> getUserGroupResults() { + return userGroupResults; + } + + public void setUserGroupResults(List<UserGroupSearchResultDTO> userGroupResults) { + this.userGroupResults = userGroupResults; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UsersEntity.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UsersEntity.java b/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UsersEntity.java new file mode 100644 index 0000000..2d11d1f --- /dev/null +++ b/nar-bundles/framework-bundle/framework/client-dto/src/main/java/org/apache/nifi/web/api/entity/UsersEntity.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import java.util.Collection; +import java.util.Date; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.web.api.dto.UserDTO; +import org.apache.nifi.web.api.dto.util.TimeAdapter; + +/** + * A serialized representation of this class can be placed in the entity body of + * a request or response to or from the API. This particular entity holds a + * reference to a collection of UserDTO. + */ +@XmlRootElement(name = "usersEntity") +public class UsersEntity extends Entity { + + private Collection<UserDTO> users; + private Date generated; + + /** + * The collection of UserDTOs that are being serialized. + * + * @return The UserDTO object + */ + public Collection<UserDTO> getUsers() { + return users; + } + + public void setUsers(Collection<UserDTO> users) { + this.users = users; + } + + /** + * When this content was generated. + * + * @return + */ + @XmlJavaTypeAdapter(TimeAdapter.class) + public Date getGenerated() { + return generated; + } + + public void setGenerated(Date generated) { + this.generated = generated; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/.gitignore ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/.gitignore b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/.gitignore new file mode 100755 index 0000000..ea8c4bf --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/.gitignore @@ -0,0 +1 @@ +/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/pom.xml b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/pom.xml new file mode 100644 index 0000000..b8960c3 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/pom.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>cluster-authorization-provider</artifactId> + <name>NiFi Framework Cluster Authority Provider</name> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>file-authorization-provider</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>framework-cluster-protocol</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>framework-cluster</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-socket-utils</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/ClusterManagerAuthorizationProvider.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/ClusterManagerAuthorizationProvider.java b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/ClusterManagerAuthorizationProvider.java new file mode 100644 index 0000000..2b3b38c --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/ClusterManagerAuthorizationProvider.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.authorization; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import org.apache.nifi.authorization.AuthorityProvider; +import org.apache.nifi.authorization.AuthorityProviderConfigurationContext; +import org.apache.nifi.authorization.AuthorityProviderInitializationContext; +import org.apache.nifi.authorization.FileAuthorizationProvider; +import org.apache.nifi.authorization.annotation.AuthorityProviderContext; +import org.apache.nifi.authorization.exception.ProviderCreationException; +import org.apache.nifi.authorization.exception.ProviderDestructionException; +import org.apache.nifi.cluster.authorization.protocol.message.DoesDnExistMessage; +import org.apache.nifi.cluster.authorization.protocol.message.GetAuthoritiesMessage; +import org.apache.nifi.cluster.authorization.protocol.message.GetGroupForUserMessage; +import org.apache.nifi.cluster.authorization.protocol.message.ProtocolMessage; +import static org.apache.nifi.cluster.authorization.protocol.message.ProtocolMessage.MessageType.DOES_DN_EXIST; +import static org.apache.nifi.cluster.authorization.protocol.message.ProtocolMessage.MessageType.GET_AUTHORITIES; +import static org.apache.nifi.cluster.authorization.protocol.message.ProtocolMessage.MessageType.GET_GROUP_FOR_USER; +import org.apache.nifi.cluster.authorization.protocol.message.jaxb.JaxbProtocolUtils; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.io.socket.ServerSocketConfiguration; +import org.apache.nifi.io.socket.SocketListener; +import org.apache.nifi.io.socket.SocketUtils; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; +import org.apache.nifi.logging.NiFiLog; +import org.apache.nifi.util.NiFiProperties; +import static org.apache.nifi.util.NiFiProperties.CLUSTER_MANAGER_ADDRESS; +import org.apache.nifi.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; + +/** + * Provides authorities for the NCM in clustered environments. Communication + * occurs over TCP/IP sockets. All method calls are deferred to the + * FileAuthorizationProvider. + */ +public class ClusterManagerAuthorizationProvider extends FileAuthorizationProvider implements AuthorityProvider, ApplicationContextAware { + + public static final String AUTHORITY_PROVIDER_SERVIVE_NAME = "cluster-authority-provider"; + + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ClusterManagerAuthorizationProvider.class)); + private static final String CLUSTER_MANAGER_AUTHORITY_PROVIDER_PORT = "Authority Provider Port"; + private static final String CLUSTER_MANAGER_AUTHORITY_PROVIDER_THREADS = "Authority Provider Threads"; + private static final int DEFAULT_CLUSTER_MANAGER_AUTHORITY_PROVIDER_THREADS = 10; + + private WebClusterManager clusterManager; + private ProtocolContext<ProtocolMessage> authorityProviderProtocolContext; + private SocketListener socketListener; + private NiFiProperties properties; + private ApplicationContext applicationContext; + + @Override + public void initialize(final AuthorityProviderInitializationContext initializationContext) throws ProviderCreationException { + super.initialize(initializationContext); + } + + @Override + public void onConfigured(final AuthorityProviderConfigurationContext configurationContext) throws ProviderCreationException { + super.onConfigured(configurationContext); + + // get the socket address of the cluster authority provider + final InetSocketAddress clusterAuthorityProviderAddress = getClusterManagerAuthorityProviderAddress(configurationContext); + + // get the cluster manager + clusterManager = applicationContext.getBean("clusterManager", WebClusterManager.class); + + // if using multicast, then the authority provider's service is broadcasted + if (properties.getClusterProtocolUseMulticast()) { + + // create the authority provider service for discovery + final DiscoverableService clusterAuthorityProviderService = new DiscoverableServiceImpl(AUTHORITY_PROVIDER_SERVIVE_NAME, clusterAuthorityProviderAddress); + + // register the authority provider service with the cluster manager + clusterManager.addBroadcastedService(clusterAuthorityProviderService); + } + + // get the number of protocol listening thread + final int numThreads = getClusterManagerAuthorityProviderThreads(configurationContext); + + // the server socket configuration + final ServerSocketConfiguration configuration = applicationContext.getBean("protocolServerSocketConfiguration", ServerSocketConfiguration.class); + + // the authority provider listens for node messages + socketListener = new SocketListener(numThreads, clusterAuthorityProviderAddress.getPort(), configuration) { + @Override + public void dispatchRequest(final Socket socket) { + ClusterManagerAuthorizationProvider.this.dispatchRequest(socket); + } + }; + + // start the socket listener + if (socketListener != null && !socketListener.isRunning()) { + try { + socketListener.start(); + } catch (final IOException ioe) { + throw new ProviderCreationException("Failed to start Cluster Manager Authorization Provider due to: " + ioe, ioe); + } + } + + // initialize the protocol context + authorityProviderProtocolContext = new JaxbProtocolContext<ProtocolMessage>(JaxbProtocolUtils.JAXB_CONTEXT); + } + + @Override + public void preDestruction() throws ProviderDestructionException { + if (socketListener != null && socketListener.isRunning()) { + try { + socketListener.stop(); + } catch (final IOException ioe) { + throw new ProviderDestructionException("Failed to stop Cluster Manager Authorization Provider due to: " + ioe, ioe); + } + } + super.preDestruction(); + } + + private int getClusterManagerAuthorityProviderThreads(final AuthorityProviderConfigurationContext configurationContext) { + try { + return Integer.parseInt(configurationContext.getProperty(CLUSTER_MANAGER_AUTHORITY_PROVIDER_THREADS)); + } catch (NumberFormatException nfe) { + return DEFAULT_CLUSTER_MANAGER_AUTHORITY_PROVIDER_THREADS; + } + } + + private InetSocketAddress getClusterManagerAuthorityProviderAddress(final AuthorityProviderConfigurationContext configurationContext) { + try { + String socketAddress = properties.getProperty(CLUSTER_MANAGER_ADDRESS); + if (StringUtils.isBlank(socketAddress)) { + socketAddress = "localhost"; + } + return InetSocketAddress.createUnresolved(socketAddress, getClusterManagerAuthorityProviderPort(configurationContext)); + } catch (Exception ex) { + throw new RuntimeException("Invalid manager authority provider address/port due to: " + ex, ex); + } + } + + private Integer getClusterManagerAuthorityProviderPort(final AuthorityProviderConfigurationContext configurationContext) { + final String authorityProviderPort = configurationContext.getProperty(CLUSTER_MANAGER_AUTHORITY_PROVIDER_PORT); + if (authorityProviderPort == null || authorityProviderPort.trim().isEmpty()) { + throw new ProviderCreationException("The authority provider port must be specified."); + } + + return Integer.parseInt(authorityProviderPort); + } + + private void dispatchRequest(final Socket socket) { + try { + // unmarshall message + final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = authorityProviderProtocolContext.createUnmarshaller(); + final ProtocolMessage request = unmarshaller.unmarshal(socket.getInputStream()); + final ProtocolMessage response = request; + + try { + switch (request.getType()) { + case DOES_DN_EXIST: { + final DoesDnExistMessage castedMsg = (DoesDnExistMessage) request; + castedMsg.setResponse(doesDnExist(castedMsg.getDn())); + break; + } + case GET_AUTHORITIES: { + final GetAuthoritiesMessage castedMsg = (GetAuthoritiesMessage) request; + castedMsg.setResponse(getAuthorities(castedMsg.getDn())); + break; + } + case GET_GROUP_FOR_USER: { + final GetGroupForUserMessage castedMsg = (GetGroupForUserMessage) request; + castedMsg.setResponse(getGroupForUser(castedMsg.getDn())); + break; + } + default: { + throw new Exception("Unsupported Message Type: " + request.getType()); + } + } + } catch (final Exception ex) { + response.setExceptionClass(ex.getClass().getName()); + response.setExceptionMessage(ex.getMessage()); + } + + final ProtocolMessageMarshaller<ProtocolMessage> marshaller = authorityProviderProtocolContext.createMarshaller(); + marshaller.marshal(response, socket.getOutputStream()); + + } catch (final Exception e) { + logger.warn("Failed processing Socket Authorization Provider protocol message due to " + e, e); + } finally { + SocketUtils.closeQuietly(socket); + } + } + + @Override + @AuthorityProviderContext + public void setApplicationContext(final ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + @Override + @AuthorityProviderContext + public void setNiFiProperties(NiFiProperties properties) { + super.setNiFiProperties(properties); + this.properties = properties; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/NodeAuthorizationProvider.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/NodeAuthorizationProvider.java b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/NodeAuthorizationProvider.java new file mode 100644 index 0000000..c81e9d0 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/NodeAuthorizationProvider.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.authorization; + +import org.apache.nifi.cluster.authorization.protocol.message.DoesDnExistMessage; +import org.apache.nifi.cluster.authorization.protocol.message.GetAuthoritiesMessage; +import org.apache.nifi.cluster.authorization.protocol.message.ProtocolMessage; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.nifi.authorization.Authority; +import org.apache.nifi.authorization.AuthorityProvider; +import org.apache.nifi.authorization.AuthorityProviderConfigurationContext; +import org.apache.nifi.authorization.AuthorityProviderInitializationContext; +import org.apache.nifi.authorization.annotation.AuthorityProviderContext; +import org.apache.nifi.authorization.exception.AuthorityAccessException; +import org.apache.nifi.authorization.exception.IdentityAlreadyExistsException; +import org.apache.nifi.authorization.exception.ProviderCreationException; +import org.apache.nifi.authorization.exception.ProviderDestructionException; +import org.apache.nifi.authorization.exception.UnknownIdentityException; +import org.apache.nifi.cluster.authorization.protocol.message.GetGroupForUserMessage; +import org.apache.nifi.cluster.authorization.protocol.message.jaxb.JaxbProtocolUtils; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.apache.nifi.io.socket.SocketUtils; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; +import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery; +import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; +import org.apache.nifi.io.socket.multicast.MulticastConfiguration; +import org.apache.nifi.logging.NiFiLog; +import org.apache.nifi.util.NiFiProperties; +import static org.apache.nifi.util.NiFiProperties.CLUSTER_NODE_UNICAST_MANAGER_ADDRESS; +import org.apache.nifi.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; + +/** + * Provides authorities for nodes in clustered environments. Communication + * occurs over TCP/IP sockets. All method calls are communicated to the cluster + * manager provider via socket. + */ +public class NodeAuthorizationProvider implements AuthorityProvider, ApplicationContextAware { + + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(NodeAuthorizationProvider.class)); + private static final String CLUSTER_NODE_MANAGER_AUTHORITY_PROVIDER_PORT = "Cluster Manager Authority Provider Port"; + + private ProtocolContext<ProtocolMessage> authorityProviderProtocolContext; + private SocketConfiguration socketConfiguration; + private ClusterServiceLocator serviceLocator; + private ApplicationContext applicationContext; + private NiFiProperties properties; + + @Override + public void initialize(AuthorityProviderInitializationContext initializationContext) throws ProviderCreationException { + } + + @Override + public void onConfigured(final AuthorityProviderConfigurationContext configurationContext) throws ProviderCreationException { + // TODO clear user cache? + + // if using multicast, then the authority provider's service is broadcasted + if (properties.getClusterProtocolUseMulticast()) { + // create the service discovery + final ClusterServiceDiscovery serviceDiscovery = new ClusterServiceDiscovery( + ClusterManagerAuthorizationProvider.AUTHORITY_PROVIDER_SERVIVE_NAME, + properties.getClusterProtocolMulticastAddress(), + applicationContext.getBean("protocolMulticastConfiguration", MulticastConfiguration.class), + applicationContext.getBean("protocolContext", ProtocolContext.class)); + + // create service location configuration + final ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig(); + config.setNumAttempts(3); + config.setTimeBetweenAttempts(1); + config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS); + + serviceLocator = new ClusterServiceLocator(serviceDiscovery); + serviceLocator.setAttemptsConfig(config); + } else { + final InetSocketAddress serviceAddress = getClusterNodeManagerAuthorityProviderAddress(configurationContext); + final DiscoverableService service = new DiscoverableServiceImpl(ClusterManagerAuthorizationProvider.AUTHORITY_PROVIDER_SERVIVE_NAME, serviceAddress); + serviceLocator = new ClusterServiceLocator(service); + } + + try { + // start the service locator + serviceLocator.start(); + } catch (final IOException ioe) { + throw new ProviderCreationException(ioe); + } + + // the socket configuration + socketConfiguration = applicationContext.getBean("protocolSocketConfiguration", SocketConfiguration.class); + + // initialize the protocol context + authorityProviderProtocolContext = new JaxbProtocolContext<ProtocolMessage>(JaxbProtocolUtils.JAXB_CONTEXT); + } + + private InetSocketAddress getClusterNodeManagerAuthorityProviderAddress(final AuthorityProviderConfigurationContext configurationContext) { + try { + String socketAddress = properties.getProperty(CLUSTER_NODE_UNICAST_MANAGER_ADDRESS); + if (StringUtils.isBlank(socketAddress)) { + socketAddress = "localhost"; + } + return InetSocketAddress.createUnresolved(socketAddress, getClusterNodeManagerAuthorityProviderPort(configurationContext)); + } catch (Exception ex) { + throw new ProviderCreationException("Invalid cluster manager authority provider address/port due to: " + ex, ex); + } + } + + private Integer getClusterNodeManagerAuthorityProviderPort(final AuthorityProviderConfigurationContext configurationContext) { + final String nodeAuthorityProviderPort = configurationContext.getProperty(CLUSTER_NODE_MANAGER_AUTHORITY_PROVIDER_PORT); + if (nodeAuthorityProviderPort == null || nodeAuthorityProviderPort.trim().isEmpty()) { + throw new ProviderCreationException("The cluster manager authority provider port must be specified."); + } + + return Integer.parseInt(nodeAuthorityProviderPort); + } + + @Override + public void setAuthorities(String dn, Set<Authority> authorities) throws AuthorityAccessException { + throw new AuthorityAccessException("Nodes are not allowed to set user authorities."); + } + + @Override + public void addUser(String dn, String group) throws IdentityAlreadyExistsException, AuthorityAccessException { + throw new AuthorityAccessException("Nodes are not allowed to add users."); + } + + @Override + public boolean doesDnExist(String dn) throws AuthorityAccessException { + // create message + final DoesDnExistMessage msg = new DoesDnExistMessage(); + msg.setDn(dn); + + Socket socket = null; + try { + + final InetSocketAddress socketAddress = getServiceAddress(); + if (socketAddress == null) { + throw new AuthorityAccessException("Cluster Authority Provider's address is not known."); + } + + try { + // create a socket + socket = SocketUtils.createSocket(socketAddress, socketConfiguration); + } catch (final IOException ioe) { + throw new AuthorityAccessException("Failed to create socket due to: " + ioe, ioe); + } + + try { + // marshal message to output stream + final ProtocolMessageMarshaller marshaller = authorityProviderProtocolContext.createMarshaller(); + marshaller.marshal(msg, socket.getOutputStream()); + } catch (final IOException ioe) { + throw new AuthorityAccessException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); + } + + try { + + // unmarshall response and return + final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = authorityProviderProtocolContext.createUnmarshaller(); + final DoesDnExistMessage response = (DoesDnExistMessage) unmarshaller.unmarshal(socket.getInputStream()); + + // check if there was an exception + if (response.wasException()) { + throw new AuthorityAccessException(response.getExceptionMessage()); + } + + // return provider's response + return response.getResponse(); + + } catch (final IOException ioe) { + throw new AuthorityAccessException("Failed unmarshalling '" + msg.getType() + "' response protocol message due to: " + ioe, ioe); + } + + } finally { + SocketUtils.closeQuietly(socket); + } + } + + @Override + public Set<Authority> getAuthorities(String dn) throws UnknownIdentityException, AuthorityAccessException { + // create message + final GetAuthoritiesMessage msg = new GetAuthoritiesMessage(); + msg.setDn(dn); + + Socket socket = null; + try { + + final InetSocketAddress socketAddress = getServiceAddress(); + if (socketAddress == null) { + throw new AuthorityAccessException("Cluster Authority Provider's address is not known."); + } + + try { + // create a socket + socket = SocketUtils.createSocket(socketAddress, socketConfiguration); + } catch (final IOException ioe) { + throw new AuthorityAccessException("Failed to create socket due to: " + ioe, ioe); + } + + try { + // marshal message to output stream + final ProtocolMessageMarshaller marshaller = authorityProviderProtocolContext.createMarshaller(); + marshaller.marshal(msg, socket.getOutputStream()); + } catch (final IOException ioe) { + throw new AuthorityAccessException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); + } + + try { + + // unmarshall response and return + final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = authorityProviderProtocolContext.createUnmarshaller(); + final GetAuthoritiesMessage response = (GetAuthoritiesMessage) unmarshaller.unmarshal(socket.getInputStream()); + + // check if there was an exception + if (response.wasException()) { + if (isException(UnknownIdentityException.class, response)) { + throw new UnknownIdentityException(response.getExceptionMessage()); + } else { + throw new AuthorityAccessException(response.getExceptionMessage()); + } + } + + // return provider's response + return response.getResponse(); + + } catch (final IOException ioe) { + throw new AuthorityAccessException("Failed unmarshalling '" + msg.getType() + "' response protocol message due to: " + ioe, ioe); + } + + } finally { + SocketUtils.closeQuietly(socket); + } + } + + @Override + public Set<String> getUsers(Authority authority) throws AuthorityAccessException { + throw new AuthorityAccessException("Nodes are not allowed to get users for a given authority."); + } + + @Override + public void revokeUser(String dn) throws UnknownIdentityException, AuthorityAccessException { + throw new AuthorityAccessException("Nodes are not allowed to revoke users."); + } + + @Override + public void setUsersGroup(Set<String> dns, String group) throws UnknownIdentityException, AuthorityAccessException { + throw new AuthorityAccessException("Nodes are not allowed to set user groups."); + } + + @Override + public void ungroupUser(String dn) throws UnknownIdentityException, AuthorityAccessException { + throw new AuthorityAccessException("Nodes are not allowed to ungroup users."); + } + + @Override + public void ungroup(String group) throws AuthorityAccessException { + throw new AuthorityAccessException("Nodes are not allowed to ungroup."); + } + + @Override + public String getGroupForUser(String dn) throws UnknownIdentityException, AuthorityAccessException { + // create message + final GetGroupForUserMessage msg = new GetGroupForUserMessage(); + msg.setDn(dn); + + Socket socket = null; + try { + + final InetSocketAddress socketAddress = getServiceAddress(); + if (socketAddress == null) { + throw new AuthorityAccessException("Cluster Authority Provider's address is not known."); + } + + try { + // create a socket + socket = SocketUtils.createSocket(socketAddress, socketConfiguration); + } catch (final IOException ioe) { + throw new AuthorityAccessException("Failed to create socket due to: " + ioe, ioe); + } + + try { + // marshal message to output stream + final ProtocolMessageMarshaller marshaller = authorityProviderProtocolContext.createMarshaller(); + marshaller.marshal(msg, socket.getOutputStream()); + } catch (final IOException ioe) { + throw new AuthorityAccessException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); + } + + try { + + // unmarshall response and return + final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = authorityProviderProtocolContext.createUnmarshaller(); + final GetGroupForUserMessage response = (GetGroupForUserMessage) unmarshaller.unmarshal(socket.getInputStream()); + + // check if there was an exception + if (response.wasException()) { + if (isException(UnknownIdentityException.class, response)) { + throw new UnknownIdentityException(response.getExceptionMessage()); + } else { + throw new AuthorityAccessException(response.getExceptionMessage()); + } + } + + return response.getResponse(); + } catch (final IOException ioe) { + throw new AuthorityAccessException("Failed unmarshalling '" + msg.getType() + "' response protocol message due to: " + ioe, ioe); + } + + } finally { + SocketUtils.closeQuietly(socket); + } + } + + @Override + public void revokeGroup(String group) throws UnknownIdentityException, AuthorityAccessException { + throw new AuthorityAccessException("Nodes are not allowed to revoke groups."); + } + + @Override + public void preDestruction() throws ProviderDestructionException { + try { + if (serviceLocator != null && serviceLocator.isRunning()) { + serviceLocator.stop(); + } + } catch (final IOException ioe) { + throw new ProviderDestructionException(ioe); + } + } + + @Override + @AuthorityProviderContext + public void setApplicationContext(final ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + @AuthorityProviderContext + public void setNiFiProperties(NiFiProperties properties) { + this.properties = properties; + } + + private InetSocketAddress getServiceAddress() { + final DiscoverableService service = serviceLocator.getService(); + if (service != null) { + return service.getServiceAddress(); + } + return null; + } + + private boolean isException(final Class<? extends Exception> exception, final ProtocolMessage protocolMessage) { + if (protocolMessage.wasException()) { + return exception.getName().equals(protocolMessage.getExceptionClass()); + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/DoesDnExistMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/DoesDnExistMessage.java b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/DoesDnExistMessage.java new file mode 100644 index 0000000..38d0dd8 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/DoesDnExistMessage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.authorization.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; +import org.apache.nifi.cluster.authorization.protocol.message.ProtocolMessage.MessageType; + +/** + * @author unattributed + */ +@XmlRootElement(name = "doesDnExistMessage") +public class DoesDnExistMessage extends ProtocolMessage { + + private String dn; + + private boolean response; + + public DoesDnExistMessage() { + } + + @Override + public MessageType getType() { + return MessageType.DOES_DN_EXIST; + } + + public String getDn() { + return dn; + } + + public void setDn(String dn) { + this.dn = dn; + } + + public boolean getResponse() { + return response; + } + + public void setResponse(boolean response) { + this.response = response; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetAuthoritiesMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetAuthoritiesMessage.java b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetAuthoritiesMessage.java new file mode 100644 index 0000000..347163f --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetAuthoritiesMessage.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.authorization.protocol.message; + +import java.util.HashSet; +import java.util.Set; +import javax.xml.bind.annotation.XmlRootElement; +import org.apache.nifi.authorization.Authority; + +/** + * @author unattributed + */ +@XmlRootElement(name = "getAuthoritiesMessage") +public class GetAuthoritiesMessage extends ProtocolMessage { + + private String dn; + + private Set<Authority> response = new HashSet<>(); + + public GetAuthoritiesMessage() { + } + + @Override + public MessageType getType() { + return MessageType.GET_AUTHORITIES; + } + + public String getDn() { + return dn; + } + + public void setDn(String dn) { + this.dn = dn; + } + + public Set<Authority> getResponse() { + return response; + } + + public void setResponse(Set<Authority> response) { + this.response = response; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetGroupForUserMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetGroupForUserMessage.java b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetGroupForUserMessage.java new file mode 100644 index 0000000..717f244 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/GetGroupForUserMessage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.authorization.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * @author unattributed + */ +@XmlRootElement(name = "getGroupForUserMessage") +public class GetGroupForUserMessage extends ProtocolMessage { + + private String dn; + + private String response; + + public GetGroupForUserMessage() { + } + + @Override + public MessageType getType() { + return MessageType.GET_GROUP_FOR_USER; + } + + public String getDn() { + return dn; + } + + public void setDn(String dn) { + this.dn = dn; + } + + public String getResponse() { + return response; + } + + public void setResponse(String response) { + this.response = response; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/ProtocolMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/ProtocolMessage.java b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/ProtocolMessage.java new file mode 100644 index 0000000..102142a --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/ProtocolMessage.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.authorization.protocol.message; + +/** + * @author unattributed + */ +public abstract class ProtocolMessage { + + private String exceptionClass; + private String exceptionMessage; + + public static enum MessageType { + + DOES_DN_EXIST, + GET_AUTHORITIES, + GET_USERS, + GET_GROUP_FOR_USER + } + + public abstract MessageType getType(); + + public boolean wasException() { + return exceptionClass != null; + } + + public String getExceptionMessage() { + return exceptionMessage; + } + + public void setExceptionMessage(final String exceptionMessage) { + this.exceptionMessage = exceptionMessage; + } + + public String getExceptionClass() { + return exceptionClass; + } + + public void setExceptionClass(String exceptionClass) { + this.exceptionClass = exceptionClass; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/JaxbProtocolUtils.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/JaxbProtocolUtils.java b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/JaxbProtocolUtils.java new file mode 100644 index 0000000..97a1bc7 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/JaxbProtocolUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.authorization.protocol.message.jaxb; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; + +/** + * @author unattributed + */ +public final class JaxbProtocolUtils { + + public static final String JAXB_CONTEXT_PATH = ObjectFactory.class.getPackage().getName(); + + public static final JAXBContext JAXB_CONTEXT = initializeJaxbContext(); + + /** + * Load the JAXBContext version. + */ + private static JAXBContext initializeJaxbContext() { + try { + return JAXBContext.newInstance(JAXB_CONTEXT_PATH); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXBContext."); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/ObjectFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/ObjectFactory.java b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/ObjectFactory.java new file mode 100644 index 0000000..5cde335 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/java/org/apache/nifi/cluster/authorization/protocol/message/jaxb/ObjectFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.authorization.protocol.message.jaxb; + +import javax.xml.bind.annotation.XmlRegistry; +import org.apache.nifi.cluster.authorization.protocol.message.DoesDnExistMessage; +import org.apache.nifi.cluster.authorization.protocol.message.GetAuthoritiesMessage; +import org.apache.nifi.cluster.authorization.protocol.message.GetGroupForUserMessage; + +/** + * @author unattributed + */ +@XmlRegistry +public class ObjectFactory { + + public ObjectFactory() { + } + + public DoesDnExistMessage createDoesDnExistMessage() { + return new DoesDnExistMessage(); + } + + public GetAuthoritiesMessage createGetAuthoritiesMessage() { + return new GetAuthoritiesMessage(); + } + + public GetGroupForUserMessage createGetGroupForUserMessage() { + return new GetGroupForUserMessage(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider new file mode 100644 index 0000000..56f4c3e --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.cluster.authorization.ClusterManagerAuthorizationProvider +org.apache.nifi.cluster.authorization.NodeAuthorizationProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/.gitignore ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/.gitignore b/nar-bundles/framework-bundle/framework/cluster-protocol/.gitignore new file mode 100755 index 0000000..ea8c4bf --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/.gitignore @@ -0,0 +1 @@ +/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml b/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml new file mode 100644 index 0000000..5351085 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml @@ -0,0 +1,69 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <artifactId>framework-cluster-protocol</artifactId> + <packaging>jar</packaging> + <name>NiFi Framework Cluster Protocol</name> + <description>The messaging protocol for clustered NiFi</description> + <dependencies> + + <!-- application dependencies --> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-properties</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-logging-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-socket-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-security</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>core-api</artifactId> + </dependency> + + <!-- spring dependencies --> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-core</artifactId> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-beans</artifactId> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java new file mode 100644 index 0000000..fa1547f --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +import org.apache.nifi.cluster.protocol.message.DisconnectMessage; +import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; +import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; +import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; +import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; +import org.apache.nifi.reporting.BulletinRepository; + +/** + * An interface for sending protocol messages from the cluster manager to nodes. + * + * @author unattributed + */ +public interface ClusterManagerProtocolSender { + + /** + * Sends a "flow request" message to a node. + * @param msg a message + * @return the response + * @throws ProtocolException if communication failed + */ + FlowResponseMessage requestFlow(FlowRequestMessage msg) throws ProtocolException; + + /** + * Sends a "reconnection request" message to a node. + * @param msg a message + * @return + * @throws ProtocolException if communication failed + */ + ReconnectionResponseMessage requestReconnection(ReconnectionRequestMessage msg) throws ProtocolException; + + /** + * Sends a "disconnection request" message to a node. + * @param msg a message + * @throws ProtocolException if communication failed + */ + void disconnect(DisconnectMessage msg) throws ProtocolException; + + /** + * Sends an "assign primary role" message to a node. + * @param msg a message + * @throws ProtocolException if communication failed + */ + void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException; + + /** + * Sets the {@link BulletinRepository} that can be used to report bulletins + * @param bulletinRepository + */ + void setBulletinRepository(final BulletinRepository bulletinRepository); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java new file mode 100644 index 0000000..1b5d007 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionRequestAdapter; + +/** + * A node's request to connect to the cluster. The request contains a proposed + * identifier. + * + * @author unattributed + */ +@XmlJavaTypeAdapter(ConnectionRequestAdapter.class) +public class ConnectionRequest { + + private final NodeIdentifier proposedNodeIdentifier; + + public ConnectionRequest(final NodeIdentifier proposedNodeIdentifier) { + if(proposedNodeIdentifier == null) { + throw new IllegalArgumentException("Proposed node identifier may not be null."); + } + this.proposedNodeIdentifier = proposedNodeIdentifier; + } + + public NodeIdentifier getProposedNodeIdentifier() { + return proposedNodeIdentifier; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java new file mode 100644 index 0000000..7a5ff2b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionResponseAdapter; + +/** + * The cluster manager's response to a node's connection request. If the manager + * has a current copy of the data flow, then it is returned with a node identifier + * to the node. Otherwise, the manager will provide a "try again in X seconds" + * response to the node in hopes that a current data flow will be available upon + * subsequent requests. + * + * @author unattributed + */ +@XmlJavaTypeAdapter(ConnectionResponseAdapter.class) +public class ConnectionResponse { + + private final boolean blockedByFirewall; + private final int tryLaterSeconds; + private final NodeIdentifier nodeIdentifier; + private final StandardDataFlow dataFlow; + private final boolean primary; + private final Integer managerRemoteInputPort; + private final Boolean managerRemoteCommsSecure; + private final String instanceId; + + private volatile String clusterManagerDN; + + public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow, final boolean primary, + final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) { + if(nodeIdentifier == null) { + throw new IllegalArgumentException("Node identifier may not be empty or null."); + } else if(dataFlow == null) { + throw new IllegalArgumentException("DataFlow may not be null."); + } + this.nodeIdentifier = nodeIdentifier; + this.dataFlow = dataFlow; + this.tryLaterSeconds = 0; + this.blockedByFirewall = false; + this.primary = primary; + this.managerRemoteInputPort = managerRemoteInputPort; + this.managerRemoteCommsSecure = managerRemoteCommsSecure; + this.instanceId = instanceId; + } + + public ConnectionResponse(final int tryLaterSeconds) { + if(tryLaterSeconds <= 0) { + throw new IllegalArgumentException("Try-Later seconds may not be nonnegative: " + tryLaterSeconds); + } + this.dataFlow = null; + this.nodeIdentifier = null; + this.tryLaterSeconds = tryLaterSeconds; + this.blockedByFirewall = false; + this.primary = false; + this.managerRemoteInputPort = null; + this.managerRemoteCommsSecure = null; + this.instanceId = null; + } + + private ConnectionResponse() { + this.dataFlow = null; + this.nodeIdentifier = null; + this.tryLaterSeconds = 0; + this.blockedByFirewall = true; + this.primary = false; + this.managerRemoteInputPort = null; + this.managerRemoteCommsSecure = null; + this.instanceId = null; + } + + public static ConnectionResponse createBlockedByFirewallResponse() { + return new ConnectionResponse(); + } + + public boolean isPrimary() { + return primary; + } + + public boolean shouldTryLater() { + return tryLaterSeconds > 0; + } + + public boolean isBlockedByFirewall() { + return blockedByFirewall; + } + + public int getTryLaterSeconds() { + return tryLaterSeconds; + } + + public StandardDataFlow getDataFlow() { + return dataFlow; + } + + public NodeIdentifier getNodeIdentifier() { + return nodeIdentifier; + } + + public Integer getManagerRemoteInputPort() { + return managerRemoteInputPort; + } + + public Boolean isManagerRemoteCommsSecure() { + return managerRemoteCommsSecure; + } + + public String getInstanceId() { + return instanceId; + } + + public void setClusterManagerDN(final String dn) { + this.clusterManagerDN = dn; + } + + /** + * Returns the DN of the NCM, if it is available or <code>null</code> otherwise. + * + * @return + */ + public String getClusterManagerDN() { + return clusterManagerDN; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java new file mode 100644 index 0000000..67324a1 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +import java.util.Date; +import javax.xml.bind.annotation.XmlTransient; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.cluster.protocol.jaxb.message.HeartbeatAdapter; + +/** + * A heartbeat for indicating the status of a node to the cluster. + * @author unattributed + */ +@XmlJavaTypeAdapter(HeartbeatAdapter.class) +public class Heartbeat { + + private final NodeIdentifier nodeIdentifier; + private final boolean primary; + private final boolean connected; + private final long createdTimestamp; + private final byte[] payload; + + public Heartbeat(final NodeIdentifier nodeIdentifier, final boolean primary, final boolean connected, final byte[] payload) { + if(nodeIdentifier == null) { + throw new IllegalArgumentException("Node Identifier may not be null."); + } + this.nodeIdentifier = nodeIdentifier; + this.primary = primary; + this.connected = connected; + this.payload = payload; + this.createdTimestamp = new Date().getTime(); + } + + public NodeIdentifier getNodeIdentifier() { + return nodeIdentifier; + } + + public byte[] getPayload() { + return payload; + } + + public boolean isPrimary() { + return primary; + } + + public boolean isConnected() { + return connected; + } + + @XmlTransient + public long getCreatedTimestamp() { + return createdTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java new file mode 100644 index 0000000..a120524 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.cluster.protocol.jaxb.message.NodeBulletinsAdapter; + +/** + * + */ +@XmlJavaTypeAdapter(NodeBulletinsAdapter.class) +public class NodeBulletins { + + private final NodeIdentifier nodeIdentifier; + private final byte[] payload; + + public NodeBulletins(NodeIdentifier nodeIdentifier, byte[] payload) { + this.nodeIdentifier = nodeIdentifier; + this.payload = payload; + } + + public NodeIdentifier getNodeIdentifier() { + return nodeIdentifier; + } + + public byte[] getPayload() { + return payload; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java new file mode 100644 index 0000000..1893186 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +import org.apache.commons.lang3.StringUtils; + +/** + * A node identifier denoting the coordinates of a flow controller that is connected + * to a cluster. Nodes provide an external public API interface and an internal private + * interface for communicating with the cluster. + * + * The external API interface and internal protocol each require an IP or hostname + * as well as a port for communicating. + * + * This class overrides hashCode and equals and considers two instances to be + * equal if they have the equal IDs. + * + * @author unattributed + * @Immutable + * @Threadsafe + */ +public class NodeIdentifier { + + /** the unique identifier for the node */ + private final String id; + + /** the IP or hostname to use for sending requests to the node's external interface */ + private final String apiAddress; + + /** the port to use use for sending requests to the node's external interface */ + private final int apiPort; + + /** the IP or hostname to use for sending requests to the node's internal interface */ + private final String socketAddress; + + /** the port to use use for sending requests to the node's internal interface */ + private final int socketPort; + + private final String nodeDn; + + public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort) { + this(id, apiAddress, apiPort, socketAddress, socketPort, null); + } + + public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, final String dn) { + + if(StringUtils.isBlank(id)) { + throw new IllegalArgumentException("Node ID may not be empty or null."); + } else if(StringUtils.isBlank(apiAddress)) { + throw new IllegalArgumentException("Node API address may not be empty or null."); + } else if(StringUtils.isBlank(socketAddress)) { + throw new IllegalArgumentException("Node socket address may not be empty or null."); + } + + validatePort(apiPort); + validatePort(socketPort); + + this.id = id; + this.apiAddress = apiAddress; + this.apiPort = apiPort; + this.socketAddress = socketAddress; + this.socketPort = socketPort; + this.nodeDn = dn; + } + + public String getId() { + return id; + } + + public String getDN() { + return nodeDn; + } + + public String getApiAddress() { + return apiAddress; + } + + public int getApiPort() { + return apiPort; + } + + public String getSocketAddress() { + return socketAddress; + } + + public int getSocketPort() { + return socketPort; + } + + private void validatePort(final int port) { + if(port < 1 || port > 65535) { + throw new IllegalArgumentException("Port must be inclusively in the range [1, 65535]. Port given: " + port); + } + } + + /** + * Compares the id of two node identifiers for equality. + * + * @param obj a node identifier + * + * @return true if the id is equal; false otherwise + */ + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final NodeIdentifier other = (NodeIdentifier) obj; + if ((this.id == null) ? (other.id != null) : !this.id.equals(other.id)) { + return false; + } + return true; + } + + /** + * Compares API address/port and socket address/port for equality. The + * id is not used for comparison. + * + * @param other a node identifier + * + * @return true if API address/port and socket address/port are equal; false + * otherwise + */ + public boolean logicallyEquals(final NodeIdentifier other) { + if(other == null) { + return false; + } + if ((this.apiAddress == null) ? (other.apiAddress != null) : !this.apiAddress.equals(other.apiAddress)) { + return false; + } + if(this.apiPort != other.apiPort) { + return false; + } + if ((this.socketAddress == null) ? (other.socketAddress != null) : !this.socketAddress.equals(other.socketAddress)) { + return false; + } + if(this.socketPort != other.socketPort) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int hash = 7; + hash = 31 * hash + (this.id != null ? this.id.hashCode() : 0); + return hash; + } + + @Override + public String toString() { + return "[" + "id=" + id + ", apiAddress=" + apiAddress + ", apiPort=" + apiPort + ", socketAddress=" + socketAddress + ", socketPort=" + socketPort + ']'; + } + +}