http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowResponseMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowResponseMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowResponseMessage.java new file mode 100644 index 0000000..0d34dae --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowResponseMessage.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.cluster.protocol.StandardDataFlow; + +/** + * @author unattributed + */ +@XmlRootElement(name = "flowResponseMessage") +public class FlowResponseMessage extends ProtocolMessage { + + private StandardDataFlow dataFlow; + + @Override + public MessageType getType() { + return MessageType.FLOW_RESPONSE; + } + + public StandardDataFlow getDataFlow() { + return dataFlow; + } + + public void setDataFlow(StandardDataFlow dataFlow) { + this.dataFlow = dataFlow; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java new file mode 100644 index 0000000..0064cb6 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatMessage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import org.apache.nifi.cluster.protocol.Heartbeat; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * @author unattributed + */ +@XmlRootElement(name = "heartbeatMessage") +public class HeartbeatMessage extends ProtocolMessage { + + private Heartbeat heartbeat; + + @Override + public MessageType getType() { + return MessageType.HEARTBEAT; + } + + public Heartbeat getHeartbeat() { + return heartbeat; + } + + public void setHeartbeat(Heartbeat heartbeat) { + this.heartbeat = heartbeat; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/MulticastProtocolMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/MulticastProtocolMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/MulticastProtocolMessage.java new file mode 100644 index 0000000..c6d2d44 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/MulticastProtocolMessage.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * Wraps a protocol message and an identifier for sending the message by way + * multicast. The identifier is necessary for the sender to identify a message + * sent by it. + * + * @author unattributed + */ +@XmlRootElement(name = "multicastMessage") +public class MulticastProtocolMessage extends ProtocolMessage { + + private ProtocolMessage protocolMessage; + + private String id; + + public MulticastProtocolMessage() {} + + public MulticastProtocolMessage(final String id, final ProtocolMessage protocolMessage) { + this.protocolMessage = protocolMessage; + this.id = id; + } + + @Override + public MessageType getType() { + if(protocolMessage == null) { + return null; + } + return protocolMessage.getType(); + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public ProtocolMessage getProtocolMessage() { + return protocolMessage; + } + + public void setProtocolMessage(ProtocolMessage protocolMessage) { + this.protocolMessage = protocolMessage; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java new file mode 100644 index 0000000..9237a92 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeBulletinsMessage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import org.apache.nifi.cluster.protocol.NodeBulletins; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * @author unattributed + */ +@XmlRootElement(name = "nodeBulletinsMessage") +public class NodeBulletinsMessage extends ProtocolMessage { + + private NodeBulletins bulletins; + + @Override + public MessageType getType() { + return MessageType.BULLETINS; + } + + public NodeBulletins getBulletins() { + return bulletins; + } + + public void setBulletins(NodeBulletins bulletins) { + this.bulletins = bulletins; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PingMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PingMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PingMessage.java new file mode 100644 index 0000000..ee38deb --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PingMessage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import java.util.Date; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * @author unattributed + */ +@XmlRootElement(name = "pingMessage") +public class PingMessage extends ProtocolMessage { + + private String id; + + private Date date = new Date(); + + public PingMessage() {} + + public Date getDate() { + return date; + } + + public void setDate(Date date) { + this.date = date; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + @Override + public MessageType getType() { + return MessageType.PING; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java new file mode 100644 index 0000000..a289abc --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter; + +/** + * @author unattributed + */ +@XmlRootElement(name = "primaryRoleAssignmentMessage") +public class PrimaryRoleAssignmentMessage extends ProtocolMessage { + + private NodeIdentifier nodeId; + + private boolean primary; + + @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) + public NodeIdentifier getNodeId() { + return nodeId; + } + + public void setNodeId(NodeIdentifier nodeId) { + this.nodeId = nodeId; + } + + public boolean isPrimary() { + return primary; + } + + public void setPrimary(boolean primary) { + this.primary = primary; + } + + @Override + public MessageType getType() { + return MessageType.PRIMARY_ROLE; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java new file mode 100644 index 0000000..6bf2a13 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +/** + * @author unattributed + */ +public abstract class ProtocolMessage { + private volatile String requestorDN; + + public static enum MessageType { + BULLETINS, + CONNECTION_REQUEST, + CONNECTION_RESPONSE, + CONTROLLER_STARTUP_FAILURE, + RECONNECTION_FAILURE, + DISCONNECTION_REQUEST, + EXCEPTION, + FLOW_REQUEST, + FLOW_RESPONSE, + HEARTBEAT, + PING, + PRIMARY_ROLE, + RECONNECTION_REQUEST, + RECONNECTION_RESPONSE, + SERVICE_BROADCAST, + } + + public abstract MessageType getType(); + + /** + * Sets the DN of the entity making the request + * @param dn + */ + public void setRequestorDN(final String dn) { + this.requestorDN = dn; + } + + /** + * Returns the DN of the entity that made the request, if using a secure socket. Otherwise, returns <code>null</code> + * @return + */ + public String getRequestorDN() { + return requestorDN; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java new file mode 100644 index 0000000..ba45e28 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionFailureMessage.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter; + +@XmlRootElement(name = "reconnectionFailureMessage") +public class ReconnectionFailureMessage extends ExceptionMessage { + private NodeIdentifier nodeId; + + public ReconnectionFailureMessage() {} + + @Override + public MessageType getType() { + return MessageType.RECONNECTION_FAILURE; + } + + @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) + public NodeIdentifier getNodeId() { + return nodeId; + } + + public void setNodeId(NodeIdentifier nodeId) { + this.nodeId = nodeId; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java new file mode 100644 index 0000000..eab3d5d --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionRequestMessage.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.StandardDataFlow; +import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter; + +/** + * @author unattributed + */ +@XmlRootElement(name = "reconnectionRequestMessage") +public class ReconnectionRequestMessage extends ProtocolMessage { + + private NodeIdentifier nodeId; + private StandardDataFlow dataFlow; + private boolean primary; + private Integer managerRemoteSiteListeningPort; + private Boolean managerRemoteSiteCommsSecure; + private String instanceId; + + public ReconnectionRequestMessage() {} + + @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) + public NodeIdentifier getNodeId() { + return nodeId; + } + + public void setNodeId(NodeIdentifier nodeId) { + this.nodeId = nodeId; + } + + public StandardDataFlow getDataFlow() { + return dataFlow; + } + + public void setDataFlow(StandardDataFlow dataFlow) { + this.dataFlow = dataFlow; + } + + public boolean isPrimary() { + return primary; + } + + public void setPrimary(boolean primary) { + this.primary = primary; + } + + @Override + public MessageType getType() { + return MessageType.RECONNECTION_REQUEST; + } + + public void setManagerRemoteSiteListeningPort(final Integer listeningPort) { + this.managerRemoteSiteListeningPort = listeningPort; + } + + public Integer getManagerRemoteSiteListeningPort() { + return managerRemoteSiteListeningPort; + } + + public void setManagerRemoteSiteCommsSecure(final Boolean remoteSiteCommsSecure) { + this.managerRemoteSiteCommsSecure = remoteSiteCommsSecure; + } + + public Boolean isManagerRemoteSiteCommsSecure() { + return managerRemoteSiteCommsSecure; + } + + public void setInstanceId(final String instanceId) { + this.instanceId = instanceId; + } + + public String getInstanceId() { + return instanceId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionResponseMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionResponseMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionResponseMessage.java new file mode 100644 index 0000000..fd0f921 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ReconnectionResponseMessage.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * This message is used an "ACK" for a ReconnectionRequestMessage + */ +@XmlRootElement(name = "reconnectionResponseMessage") +public class ReconnectionResponseMessage extends ProtocolMessage { + + @Override + public MessageType getType() { + return MessageType.RECONNECTION_RESPONSE; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ServiceBroadcastMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ServiceBroadcastMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ServiceBroadcastMessage.java new file mode 100644 index 0000000..92708ba --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ServiceBroadcastMessage.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * @author unattributed + */ +@XmlRootElement(name = "serviceBroadcastMessage") +public class ServiceBroadcastMessage extends ProtocolMessage { + + private String serviceName; + + private String address; + + private int port; + + public ServiceBroadcastMessage() {} + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + @Override + public MessageType getType() { + return MessageType.SERVICE_BROADCAST; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/MulticastConfigurationFactoryBean.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/MulticastConfigurationFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/MulticastConfigurationFactoryBean.java new file mode 100644 index 0000000..fa201bb --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/MulticastConfigurationFactoryBean.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.spring; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.io.socket.multicast.MulticastConfiguration; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.springframework.beans.factory.FactoryBean; + +/** + * Factory bean for creating a singleton MulticastConfiguration instance. + */ +public class MulticastConfigurationFactoryBean implements FactoryBean { + + private MulticastConfiguration configuration; + private NiFiProperties properties; + + @Override + public Object getObject() throws Exception { + if(configuration == null) { + configuration = new MulticastConfiguration(); + + final int timeout = (int) FormatUtils.getTimeDuration(properties.getClusterProtocolSocketTimeout(), TimeUnit.MILLISECONDS); + configuration.setSocketTimeout(timeout); + configuration.setReuseAddress(true); + } + return configuration; + + } + + @Override + public Class getObjectType() { + return MulticastConfiguration.class; + } + + @Override + public boolean isSingleton() { + return true; + } + + public void setProperties(NiFiProperties properties) { + this.properties = properties; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/ServerSocketConfigurationFactoryBean.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/ServerSocketConfigurationFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/ServerSocketConfigurationFactoryBean.java new file mode 100644 index 0000000..5b5816d --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/ServerSocketConfigurationFactoryBean.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.spring; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.io.socket.SSLContextFactory; +import org.apache.nifi.io.socket.ServerSocketConfiguration; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; + +import org.springframework.beans.factory.FactoryBean; + +/** + * Factory bean for creating a singleton ServerSocketConfiguration instance. + */ +public class ServerSocketConfigurationFactoryBean implements FactoryBean<ServerSocketConfiguration> { + private ServerSocketConfiguration configuration; + private NiFiProperties properties; + + @Override + public ServerSocketConfiguration getObject() throws Exception { + if(configuration == null) { + configuration = new ServerSocketConfiguration(); + configuration.setNeedClientAuth(properties.getNeedClientAuth()); + + final int timeout = (int) FormatUtils.getTimeDuration(properties.getClusterProtocolSocketTimeout(), TimeUnit.MILLISECONDS); + configuration.setSocketTimeout(timeout); + configuration.setReuseAddress(true); + if(Boolean.valueOf(properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_IS_SECURE))) { + configuration.setSSLContextFactory(new SSLContextFactory(properties)); + } + } + return configuration; + + } + + @Override + public Class<ServerSocketConfiguration> getObjectType() { + return ServerSocketConfiguration.class; + } + + @Override + public boolean isSingleton() { + return true; + } + + public void setProperties(NiFiProperties properties) { + this.properties = properties; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/SocketConfigurationFactoryBean.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/SocketConfigurationFactoryBean.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/SocketConfigurationFactoryBean.java new file mode 100644 index 0000000..b438e44 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/spring/SocketConfigurationFactoryBean.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.spring; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.io.socket.SSLContextFactory; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; + +import org.springframework.beans.factory.FactoryBean; + +/** + * Factory bean for creating a singleton SocketConfiguration instance. + */ +public class SocketConfigurationFactoryBean implements FactoryBean<SocketConfiguration> { + + private SocketConfiguration configuration; + + private NiFiProperties properties; + + @Override + public SocketConfiguration getObject() throws Exception { + if(configuration == null) { + configuration = new SocketConfiguration(); + + final int timeout = (int) FormatUtils.getTimeDuration(properties.getClusterProtocolSocketTimeout(), TimeUnit.MILLISECONDS); + configuration.setSocketTimeout(timeout); + configuration.setReuseAddress(true); + if(Boolean.valueOf(properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_IS_SECURE))) { + configuration.setSSLContextFactory(new SSLContextFactory(properties)); + } + } + return configuration; + + } + + @Override + public Class<SocketConfiguration> getObjectType() { + return SocketConfiguration.class; + } + + @Override + public boolean isSingleton() { + return true; + } + + public void setProperties(NiFiProperties properties) { + this.properties = properties; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml new file mode 100644 index 0000000..07ea7a4 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml @@ -0,0 +1,110 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!-- marked as lazy so that cluster protocol beans are not created when applications runs in standalone mode --> +<beans default-lazy-init="true" + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd + http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd"> + + <!-- protocol context --> + <bean id="protocolContext" class="org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext"> + <constructor-arg> + <util:constant static-field="org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils.JAXB_CONTEXT"/> + </constructor-arg> + </bean> + + <!-- socket configuration --> + <bean id="protocolSocketConfiguration" class="org.apache.nifi.cluster.protocol.spring.SocketConfigurationFactoryBean"> + <property name="properties" ref="nifiProperties"/> + </bean> + + <!-- server socket configuration --> + <bean id="protocolServerSocketConfiguration" class="org.apache.nifi.cluster.protocol.spring.ServerSocketConfigurationFactoryBean"> + <property name="properties" ref="nifiProperties"/> + </bean> + + <!-- multicast configuration --> + <bean id="protocolMulticastConfiguration" class="org.apache.nifi.cluster.protocol.spring.MulticastConfigurationFactoryBean"> + <property name="properties" ref="nifiProperties"/> + </bean> + + <!-- cluster manager protocol sender --> + <bean id="clusterManagerProtocolSender" class="org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderImpl"> + <constructor-arg ref="protocolSocketConfiguration"/> + <constructor-arg ref="protocolContext"/> + <property name="handshakeTimeout"> + <bean factory-bean="nifiProperties" factory-method="getClusterProtocolConnectionHandshakeTimeout"/> + </property> + </bean> + + <!-- cluster manager protocol listener --> + <bean id="clusterManagerProtocolListener" class="org.apache.nifi.cluster.protocol.impl.SocketProtocolListener"> + <constructor-arg index="0"> + <bean factory-bean="nifiProperties" factory-method="getClusterManagerProtocolThreads"/> + </constructor-arg> + <constructor-arg index="1"> + <bean factory-bean="nifiProperties" factory-method="getClusterManagerProtocolPort"/> + </constructor-arg> + <constructor-arg ref="protocolServerSocketConfiguration" index="2"/> + <constructor-arg ref="protocolContext" index="3"/> + </bean> + + <!-- cluster manager sender/listener --> + <bean id="clusterManagerProtocolSenderListener" class="org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener"> + <constructor-arg ref="clusterManagerProtocolSender"/> + <constructor-arg ref="clusterManagerProtocolListener"/> + </bean> + + <!-- node protocol sender --> + <bean id="nodeProtocolSender" class="org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderImpl"> + <constructor-arg ref="clusterManagerProtocolServiceLocator"/> + <constructor-arg ref="protocolSocketConfiguration"/> + <constructor-arg ref="protocolContext"/> + </bean> + + <!-- node protocol listener --> + <bean id="nodeProtocolListener" class="org.apache.nifi.cluster.protocol.impl.SocketProtocolListener"> + <constructor-arg index="0"> + <bean factory-bean="nifiProperties" factory-method="getClusterNodeProtocolThreads"/> + </constructor-arg> + <constructor-arg index="1"> + <bean factory-bean="nifiProperties" factory-method="getClusterNodeProtocolPort"/> + </constructor-arg> + <constructor-arg ref="protocolServerSocketConfiguration" index="2"/> + <constructor-arg ref="protocolContext" index="3"/> + </bean> + + <!-- node sender/listener --> + <bean id="nodeProtocolSenderListener" class="org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener"> + <constructor-arg ref="nodeProtocolSender"/> + <constructor-arg ref="nodeProtocolListener"/> + </bean> + + <!-- cluster services broadcaster --> + <bean id="clusterServicesBroadcaster" class="org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster"> + <constructor-arg index="0"> + <bean factory-bean="nifiProperties" factory-method="getClusterProtocolMulticastAddress"/> + </constructor-arg> + <constructor-arg ref="protocolMulticastConfiguration" index="1"/> + <constructor-arg ref="protocolContext" index="2"/> + <constructor-arg index="3"> + <bean factory-bean="nifiProperties" factory-method="getClusterProtocolMulticastServiceBroadcastDelay"/> + </constructor-arg> + </bean> + +</beans> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java new file mode 100644 index 0000000..59837c1 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener; +import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderImpl; +import java.io.IOException; +import java.net.InetAddress; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; +import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.io.socket.ServerSocketConfiguration; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * @author unattributed + */ +public class ClusterManagerProtocolSenderImplTest { + + private InetAddress address; + + private int port; + + private SocketProtocolListener listener; + + private ClusterManagerProtocolSenderImpl sender; + + private ProtocolHandler mockHandler; + + @Before + public void setup() throws IOException { + + address = InetAddress.getLocalHost(); + ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration(); + serverSocketConfiguration.setSocketTimeout(2000); + + mockHandler = mock(ProtocolHandler.class); + + ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext); + listener.addHandler(mockHandler); + listener.start(); + + port = listener.getPort(); + + SocketConfiguration socketConfiguration = new SocketConfiguration(); + sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, protocolContext); + } + + @After + public void teardown() throws IOException { + if(listener.isRunning()) { + listener.stop(); + } + } + + @Test + public void testRequestFlow() throws Exception { + + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new FlowResponseMessage()); + FlowRequestMessage request = new FlowRequestMessage(); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); + FlowResponseMessage response = sender.requestFlow(request); + assertNotNull(response); + } + + @Test + public void testRequestFlowWithBadResponseMessage() throws Exception { + + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage()); + FlowRequestMessage request = new FlowRequestMessage(); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); + try { + sender.requestFlow(request); + fail("failed to throw exception"); + } catch(ProtocolException pe) {} + + } + + @Test + public void testRequestFlowDelayedResponse() throws Exception { + + final int time = 250; + sender.getSocketConfiguration().setSocketTimeout(time); + + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<FlowResponseMessage>() { + @Override + public FlowResponseMessage answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(time * 3); + return new FlowResponseMessage(); + } + }); + FlowRequestMessage request = new FlowRequestMessage(); + request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port)); + try { + sender.requestFlow(request); + fail("failed to throw exception"); + } catch(ProtocolException pe) {} + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java new file mode 100644 index 0000000..91f81af --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; +import org.apache.nifi.io.socket.multicast.MulticastConfiguration; +import org.apache.nifi.io.socket.multicast.MulticastUtils; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * @author unattributed + */ +public class ClusterServiceDiscoveryTest { + + private ClusterServiceDiscovery discovery; + + private String serviceName; + + private MulticastSocket socket; + + private InetSocketAddress multicastAddress; + + private MulticastConfiguration configuration; + + private ProtocolContext protocolContext; + + @Before + public void setup() throws Exception { + + serviceName = "some-service"; + multicastAddress = new InetSocketAddress("225.1.1.1", 22222); + configuration = new MulticastConfiguration(); + + protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + discovery = new ClusterServiceDiscovery(serviceName, multicastAddress, configuration, protocolContext); + discovery.start(); + + socket = MulticastUtils.createMulticastSocket(multicastAddress.getPort(), configuration); + } + + @After + public void teardown() throws IOException { + try { + if(discovery.isRunning()) { + discovery.stop(); + } + } finally { + MulticastUtils.closeQuietly(socket); + } + } + + @Test + public void testGetAddressOnStartup() { + assertNull(discovery.getService()); + } + + @Ignore("This test has an NPE after ignoring another...perhaps has a bad inter-test dependency") + @Test + public void testGetAddressAfterBroadcast() throws Exception { + + ServiceBroadcastMessage msg = new ServiceBroadcastMessage(); + msg.setServiceName("some-service"); + msg.setAddress("3.3.3.3"); + msg.setPort(1234); + + // marshal message to output stream + ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + marshaller.marshal(msg, baos); + byte[] requestPacketBytes = baos.toByteArray(); + DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, multicastAddress); + socket.send(packet); + + Thread.sleep(250); + + InetSocketAddress updatedAddress = discovery.getService().getServiceAddress(); + assertEquals("some-service", discovery.getServiceName()); + assertEquals("3.3.3.3", updatedAddress.getHostName()); + assertEquals(1234, updatedAddress.getPort()); + + } + + @Test + public void testBadBroadcastMessage() throws Exception { + + ProtocolMessage msg = new PingMessage(); + + // marshal message to output stream + ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + marshaller.marshal(msg, baos); + byte[] requestPacketBytes = baos.toByteArray(); + DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, multicastAddress); + socket.send(packet); + + Thread.sleep(250); + + assertNull(discovery.getService()); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java new file mode 100644 index 0000000..b1c156b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator; +import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Mockito.*; +import org.mockito.stubbing.OngoingStubbing; + +public class ClusterServiceLocatorTest { + + private ClusterServiceDiscovery mockServiceDiscovery; + + private int fixedPort; + + private DiscoverableService fixedService; + + private ClusterServiceLocator serviceDiscoveryLocator; + + private ClusterServiceLocator serviceDiscoveryFixedPortLocator; + + private ClusterServiceLocator fixedServiceLocator; + + @Before + public void setup() throws Exception { + + fixedPort = 1; + mockServiceDiscovery = mock(ClusterServiceDiscovery.class); + fixedService = new DiscoverableServiceImpl("some-service", InetSocketAddress.createUnresolved("some-host", 20)); + + serviceDiscoveryLocator = new ClusterServiceLocator(mockServiceDiscovery); + serviceDiscoveryFixedPortLocator = new ClusterServiceLocator(mockServiceDiscovery, fixedPort); + fixedServiceLocator = new ClusterServiceLocator(fixedService); + + } + + @Test + public void getServiceWhenServiceDiscoveryNotStarted() { + assertNull(serviceDiscoveryLocator.getService()); + } + + @Test + public void getServiceWhenServiceDiscoveryFixedPortNotStarted() { + assertNull(serviceDiscoveryLocator.getService()); + } + + @Test + public void getServiceWhenFixedServiceNotStarted() { + assertEquals(fixedService, fixedServiceLocator.getService()); + } + + @Test + public void getServiceNotOnFirstAttempt() { + + ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig(); + config.setNumAttempts(2); + config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS); + config.setTimeBetweenAttempts(1); + + serviceDiscoveryLocator.setAttemptsConfig(config); + + OngoingStubbing<DiscoverableService> stubbing = null; + for(int i = 0; i < config.getNumAttempts() - 1; i++) { + if(stubbing == null) { + stubbing = when(mockServiceDiscovery.getService()).thenReturn(null); + } else { + stubbing.thenReturn(null); + } + } + stubbing.thenReturn(fixedService); + + assertEquals(fixedService, serviceDiscoveryLocator.getService()); + + } + + @Test + public void getServiceNotOnFirstAttemptWithFixedPort() { + + ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig(); + config.setNumAttempts(2); + config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS); + config.setTimeBetweenAttempts(1); + + serviceDiscoveryFixedPortLocator.setAttemptsConfig(config); + + OngoingStubbing<DiscoverableService> stubbing = null; + for(int i = 0; i < config.getNumAttempts() - 1; i++) { + if(stubbing == null) { + stubbing = when(mockServiceDiscovery.getService()).thenReturn(null); + } else { + stubbing.thenReturn(null); + } + } + stubbing.thenReturn(fixedService); + + InetSocketAddress resultAddress = InetSocketAddress.createUnresolved(fixedService.getServiceAddress().getHostName(), fixedPort); + DiscoverableService resultService = new DiscoverableServiceImpl(fixedService.getServiceName(), resultAddress); + assertEquals(resultService, serviceDiscoveryFixedPortLocator.getService()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java new file mode 100644 index 0000000..ec1f26d --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster; +import org.apache.nifi.cluster.protocol.impl.MulticastProtocolListener; +import java.net.InetSocketAddress; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; +import org.apache.nifi.io.socket.multicast.MulticastConfiguration; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * @author unattributed + */ +public class ClusterServicesBroadcasterTest { + + private ClusterServicesBroadcaster broadcaster; + + private MulticastProtocolListener listener; + + private DummyProtocolHandler handler; + + private InetSocketAddress multicastAddress; + + private DiscoverableService broadcastedService; + + private ProtocolContext protocolContext; + + private MulticastConfiguration configuration; + + @Before + public void setup() throws Exception { + + broadcastedService = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", 11111)); + + multicastAddress = new InetSocketAddress("225.1.1.1", 22222); + + configuration = new MulticastConfiguration(); + + protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + broadcaster = new ClusterServicesBroadcaster(multicastAddress, configuration, protocolContext, "500 ms"); + broadcaster.addService(broadcastedService); + + handler = new DummyProtocolHandler(); + listener = new MulticastProtocolListener(5, multicastAddress, configuration, protocolContext); + listener.addHandler(handler); + } + + @After + public void teardown() { + + if(broadcaster.isRunning()) { + broadcaster.stop(); + } + + try { + if(listener.isRunning()) { + listener.stop(); + } + } catch(Exception ex) { + ex.printStackTrace(System.out); + } + + } + + @Ignore("fails needs to be fixed") + @Test + public void testBroadcastReceived() throws Exception { + + broadcaster.start(); + listener.start(); + + Thread.sleep(1000); + + listener.stop(); + + assertNotNull(handler.getProtocolMessage()); + assertEquals(ProtocolMessage.MessageType.SERVICE_BROADCAST, handler.getProtocolMessage().getType()); + final ServiceBroadcastMessage msg = (ServiceBroadcastMessage) handler.getProtocolMessage(); + assertEquals(broadcastedService.getServiceName(), msg.getServiceName()); + assertEquals(broadcastedService.getServiceAddress().getHostName(), msg.getAddress()); + assertEquals(broadcastedService.getServiceAddress().getPort(), msg.getPort()); + } + + private class DummyProtocolHandler implements ProtocolHandler { + + private ProtocolMessage protocolMessage; + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + this.protocolMessage = msg; + return null; + } + + public ProtocolMessage getProtocolMessage() { + return protocolMessage; + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java new file mode 100644 index 0000000..af00590 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import org.apache.nifi.cluster.protocol.impl.MulticastProtocolListener; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.util.ArrayList; +import java.util.List; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.io.socket.multicast.MulticastConfiguration; +import org.apache.nifi.io.socket.multicast.MulticastUtils; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * @author unattributed + */ +public class MulticastProtocolListenerTest { + + private MulticastProtocolListener listener; + + private MulticastSocket socket; + + private InetSocketAddress address; + + private MulticastConfiguration configuration; + + private ProtocolContext protocolContext; + + @Before + public void setup() throws Exception { + + address = new InetSocketAddress("226.1.1.1", 60000); + configuration = new MulticastConfiguration(); + + protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + listener = new MulticastProtocolListener(5, address, configuration, protocolContext); + listener.start(); + + socket = MulticastUtils.createMulticastSocket(address.getPort(), configuration); + } + + @After + public void teardown() throws IOException { + try { + if(listener.isRunning()) { + listener.stop(); + } + } finally { + MulticastUtils.closeQuietly(socket); + } + } + + @Test + public void testBadRequest() throws Exception { + DelayedProtocolHandler handler = new DelayedProtocolHandler(0); + listener.addHandler(handler); + DatagramPacket packet = new DatagramPacket(new byte[] {5}, 1, address); + socket.send(packet); + Thread.sleep(250); + assertEquals(0, handler.getMessages().size()); + } + + @Ignore("this test works sometimes and fails others - needs work to be reliable") + @Test + public void testRequest() throws Exception { + + ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler(); + listener.addHandler(handler); + + ProtocolMessage msg = new PingMessage(); + MulticastProtocolMessage multicastMsg = new MulticastProtocolMessage("some-id", msg); + + // marshal message to output stream + ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + marshaller.marshal(multicastMsg, baos); + byte[] requestPacketBytes = baos.toByteArray(); + DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, address); + socket.send(packet); + + Thread.sleep(250); + assertEquals(1, handler.getMessages().size()); + assertEquals(msg.getType(), handler.getMessages().get(0).getType()); + + } + + private class ReflexiveProtocolHandler implements ProtocolHandler { + + private List<ProtocolMessage> messages = new ArrayList<>(); + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + messages.add(msg); + return msg; + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + public List<ProtocolMessage> getMessages() { + return messages; + } + + } + + private class DelayedProtocolHandler implements ProtocolHandler { + + private int delay = 0; + + private List<ProtocolMessage> messages = new ArrayList<>(); + + public DelayedProtocolHandler(int delay) { + this.delay = delay; + } + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + try { + messages.add(msg); + Thread.sleep(delay); + return null; + } catch(final InterruptedException ie) { + throw new ProtocolException(ie); + } + + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + public List<ProtocolMessage> getMessages() { + return messages; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java new file mode 100644 index 0000000..1c5ba9e --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator; +import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener; +import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderImpl; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.UUID; + +import org.apache.nifi.cluster.protocol.ConnectionRequest; +import org.apache.nifi.cluster.protocol.ConnectionResponse; +import org.apache.nifi.cluster.protocol.Heartbeat; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.StandardDataFlow; +import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; +import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext; +import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils; +import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; +import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.io.socket.ServerSocketConfiguration; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; + +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * @author unattributed + */ +@Ignore("Randomly tests... probably timing-specific") +public class NodeProtocolSenderImplTest { + + private SocketProtocolListener listener; + + private NodeProtocolSenderImpl sender; + + private DiscoverableService service; + + private ServerSocketConfiguration serverSocketConfiguration; + + private ClusterServiceLocator mockServiceLocator; + + private ProtocolHandler mockHandler; + + private NodeIdentifier nodeIdentifier; + + @Before + public void setup() throws IOException { + + serverSocketConfiguration = new ServerSocketConfiguration(); + + mockServiceLocator = mock(ClusterServiceLocator.class); + mockHandler = mock(ProtocolHandler.class); + + nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, "localhost", 5678); + + ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT); + + listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext); + listener.setShutdownListenerSeconds(3); + listener.addHandler(mockHandler); + listener.start(); + + service = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", listener.getPort())); + + SocketConfiguration socketConfiguration = new SocketConfiguration(); + socketConfiguration.setReuseAddress(true); + sender = new NodeProtocolSenderImpl(mockServiceLocator, socketConfiguration, protocolContext); + } + + @After + public void teardown() throws IOException { + if(listener.isRunning()) { + listener.stop(); + } + } + + @Test + public void testConnect() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + ConnectionResponseMessage mockMessage = new ConnectionResponseMessage(); + mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier, new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString())); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage); + + ConnectionRequestMessage request = new ConnectionRequestMessage(); + request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); + ConnectionResponseMessage response = sender.requestConnection(request); + assertNotNull(response); + } + + @Test(expected = UnknownServiceAddressException.class) + public void testConnectNoClusterManagerAddress() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(null); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new ConnectionResponseMessage()); + + ConnectionRequestMessage request = new ConnectionRequestMessage(); + request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); + + sender.requestConnection(request); + fail("failed to throw exception"); + } + + @Test(expected = ProtocolException.class) + public void testConnectBadResponse() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage()); + + ConnectionRequestMessage request = new ConnectionRequestMessage(); + request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); + + sender.requestConnection(request); + fail("failed to throw exception"); + + } + + @Test(expected = ProtocolException.class) + public void testConnectDelayedResponse() throws Exception { + + final int time = 250; + sender.getSocketConfiguration().setSocketTimeout(time); + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<ConnectionResponseMessage>() { + @Override + public ConnectionResponseMessage answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(time * 3); + return new ConnectionResponseMessage(); + } + }); + ConnectionRequestMessage request = new ConnectionRequestMessage(); + request.setConnectionRequest(new ConnectionRequest(nodeIdentifier)); + + sender.requestConnection(request); + fail("failed to throw exception"); + + } + + @Test + public void testHeartbeat() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null); + + HeartbeatMessage hb = new HeartbeatMessage(); + hb.setHeartbeat(new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4), false, false, new byte[] {1, 2, 3})); + sender.heartbeat(hb); + } + + @Test + public void testNotifyControllerStartupFailure() throws Exception { + + when(mockServiceLocator.getService()).thenReturn(service); + when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE); + when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null); + + ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage(); + msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, "some-addr", 1)); + msg.setExceptionMessage("some exception"); + sender.notifyControllerStartupFailure(msg); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java new file mode 100644 index 0000000..07ee83a --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl.testutils; + +import java.util.ArrayList; +import java.util.List; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; + +/** + * @author unattributed + */ +public class DelayedProtocolHandler implements ProtocolHandler { + + private int delay = 0; + private List<ProtocolMessage> messages = new ArrayList<>(); + + public DelayedProtocolHandler(int delay) { + this.delay = delay; + } + + @Override + public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException { + try { + messages.add(msg); + Thread.sleep(delay); + return null; + } catch (final InterruptedException ie) { + throw new ProtocolException(ie); + } + + } + + @Override + public boolean canHandle(ProtocolMessage msg) { + return true; + } + + public List<ProtocolMessage> getMessages() { + return messages; + } +}