http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/DisconnectedNodeMutableRequestException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/DisconnectedNodeMutableRequestException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/DisconnectedNodeMutableRequestException.java new file mode 100644 index 0000000..412a555 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/DisconnectedNodeMutableRequestException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when a HTTP request that may change a node's + * dataflow is to be replicated while one or more nodes are disconnected. + * + * @author unattributed + */ +public class DisconnectedNodeMutableRequestException extends MutableRequestException { + + public DisconnectedNodeMutableRequestException() { + } + + public DisconnectedNodeMutableRequestException(String msg) { + super(msg); + } + + public DisconnectedNodeMutableRequestException(Throwable cause) { + super(cause); + } + + public DisconnectedNodeMutableRequestException(String msg, Throwable cause) { + super(msg, cause); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java new file mode 100644 index 0000000..6c4e670 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Signals that an operation to be performed on a cluster has been invoked at an + * illegal or inappropriate time. + * + * @author unattributed + */ +public class IllegalClusterStateException extends ClusterException { + + public IllegalClusterStateException() { + } + + public IllegalClusterStateException(String msg) { + super(msg); + } + + public IllegalClusterStateException(Throwable cause) { + super(cause); + } + + public IllegalClusterStateException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDeletionException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDeletionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDeletionException.java new file mode 100644 index 0000000..adef62a --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDeletionException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when a deletion request is issued to a node + * that cannot be deleted (e.g., the node is not disconnected). + * + * @author unattributed + */ +public class IllegalNodeDeletionException extends IllegalClusterStateException { + + public IllegalNodeDeletionException() { + } + + public IllegalNodeDeletionException(String msg) { + super(msg); + } + + public IllegalNodeDeletionException(Throwable cause) { + super(cause); + } + + public IllegalNodeDeletionException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java new file mode 100644 index 0000000..7e61b24 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when a disconnection request is issued to a + * node that cannot be disconnected (e.g., last node in cluster, node is primary + * node). + * + * @author unattributed + */ +public class IllegalNodeDisconnectionException extends IllegalClusterStateException { + + public IllegalNodeDisconnectionException() { + } + + public IllegalNodeDisconnectionException(String msg) { + super(msg); + } + + public IllegalNodeDisconnectionException(Throwable cause) { + super(cause); + } + + public IllegalNodeDisconnectionException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeReconnectionException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeReconnectionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeReconnectionException.java new file mode 100644 index 0000000..96c76bc --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeReconnectionException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when a reconnection request is issued to a + * node that cannot be reconnected (e.g., the node is not disconnected). + * + * @author unattributed + */ +public class IllegalNodeReconnectionException extends IllegalClusterStateException { + + public IllegalNodeReconnectionException() { + } + + public IllegalNodeReconnectionException(String msg) { + super(msg); + } + + public IllegalNodeReconnectionException(Throwable cause) { + super(cause); + } + + public IllegalNodeReconnectionException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IneligiblePrimaryNodeException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IneligiblePrimaryNodeException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IneligiblePrimaryNodeException.java new file mode 100644 index 0000000..4b0097a --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IneligiblePrimaryNodeException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when the primary role cannot be assigned to a + * node because the node is ineligible for the role. + * + * @author unattributed + */ +public class IneligiblePrimaryNodeException extends IllegalClusterStateException { + + public IneligiblePrimaryNodeException() { + } + + public IneligiblePrimaryNodeException(String msg) { + super(msg); + } + + public IneligiblePrimaryNodeException(Throwable cause) { + super(cause); + } + + public IneligiblePrimaryNodeException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/MutableRequestException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/MutableRequestException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/MutableRequestException.java new file mode 100644 index 0000000..d160587 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/MutableRequestException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when a HTTP request that may change a node's + * state is to be replicated while the cluster or connected nodes are unable to + * change their state (e.g., a new node is connecting to the cluster). + * + * @author unattributed + */ +public class MutableRequestException extends IllegalClusterStateException { + + public MutableRequestException() { + } + + public MutableRequestException(String msg) { + super(msg); + } + + public MutableRequestException(Throwable cause) { + super(cause); + } + + public MutableRequestException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java new file mode 100644 index 0000000..8d704b9 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when the cluster is unable to service a + * request because no nodes are connected. + * + * @author unattributed + */ +public class NoConnectedNodesException extends ClusterException { + + public NoConnectedNodesException() { + } + + public NoConnectedNodesException(String msg) { + super(msg); + } + + public NoConnectedNodesException(Throwable cause) { + super(cause); + } + + public NoConnectedNodesException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java new file mode 100644 index 0000000..9e17a23 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when the cluster is unable to service a + * request because no nodes returned a response. When the given request is not + * mutable the nodes are left in their previous state. + * + * @author unattributed + */ +public class NoResponseFromNodesException extends ClusterException { + + public NoResponseFromNodesException() { + } + + public NoResponseFromNodesException(String msg) { + super(msg); + } + + public NoResponseFromNodesException(Throwable cause) { + super(cause); + } + + public NoResponseFromNodesException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java new file mode 100644 index 0000000..3bd2f4b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when a disconnection request to a node + * failed. + * + * @author unattributed + */ +public class NodeDisconnectionException extends ClusterException { + + public NodeDisconnectionException() { + } + + public NodeDisconnectionException(String msg) { + super(msg); + } + + public NodeDisconnectionException(Throwable cause) { + super(cause); + } + + public NodeDisconnectionException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java new file mode 100644 index 0000000..8c40cef --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when a reconnection request to a node failed. + * + * @author unattributed + */ +public class NodeReconnectionException extends ClusterException { + + public NodeReconnectionException() { + } + + public NodeReconnectionException(String msg) { + super(msg); + } + + public NodeReconnectionException(Throwable cause) { + super(cause); + } + + public NodeReconnectionException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/PrimaryRoleAssignmentException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/PrimaryRoleAssignmentException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/PrimaryRoleAssignmentException.java new file mode 100644 index 0000000..403f7a5 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/PrimaryRoleAssignmentException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when the cluster is unable to update the + * primary role of a node. + * + * @author unattributed + */ +public class PrimaryRoleAssignmentException extends IllegalClusterStateException { + + public PrimaryRoleAssignmentException() { + } + + public PrimaryRoleAssignmentException(String msg) { + super(msg); + } + + public PrimaryRoleAssignmentException(Throwable cause) { + super(cause); + } + + public PrimaryRoleAssignmentException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/SafeModeMutableRequestException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/SafeModeMutableRequestException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/SafeModeMutableRequestException.java new file mode 100644 index 0000000..f544f26 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/SafeModeMutableRequestException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when a HTTP request that may change a node's + * dataflow is to be replicated while the cluster is in safe mode. + * + * @author unattributed + */ +public class SafeModeMutableRequestException extends MutableRequestException { + + public SafeModeMutableRequestException() { + } + + public SafeModeMutableRequestException(String msg) { + super(msg); + } + + public SafeModeMutableRequestException(Throwable cause) { + super(cause); + } + + public SafeModeMutableRequestException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java new file mode 100644 index 0000000..914bb56 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when a request is made for a node that does + * not exist. + * + * @author unattributed + */ +public class UnknownNodeException extends ClusterException { + + public UnknownNodeException() { + } + + public UnknownNodeException(String msg) { + super(msg); + } + + public UnknownNodeException(Throwable cause) { + super(cause); + } + + public UnknownNodeException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UriConstructionException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UriConstructionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UriConstructionException.java new file mode 100644 index 0000000..773d7b5 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UriConstructionException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.exception; + +/** + * Represents the exceptional case when a URI cannot be constructed from the + * given information. This exception is similar to Java's URISyntaxException + * except that it extends RuntimeException. + * + * @author unattributed + */ +public class UriConstructionException extends RuntimeException { + + public UriConstructionException() { + } + + public UriConstructionException(String msg) { + super(msg); + } + + public UriConstructionException(Throwable cause) { + super(cause); + } + + public UriConstructionException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java new file mode 100644 index 0000000..2015530 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QuerySubmission; +import org.apache.nifi.provenance.search.SearchableField; +import org.apache.nifi.reporting.EventAccess; + +public class ClusteredEventAccess implements EventAccess { + + private final WebClusterManager clusterManager; + + public ClusteredEventAccess(final WebClusterManager clusterManager) { + this.clusterManager = clusterManager; + } + + @Override + public ProcessGroupStatus getControllerStatus() { + return clusterManager.getProcessGroupStatus(WebClusterManager.ROOT_GROUP_ID_ALIAS); + } + + @Override + public List<ProvenanceEventRecord> getProvenanceEvents(long arg0, int arg1) throws IOException { + return new ArrayList<>(); + } + + @Override + public ProvenanceEventRepository getProvenanceRepository() { + // NCM doesn't have provenance events, because it doesn't process FlowFiles. + // So we just use a Provenance Event Repository that does nothing. + return new ProvenanceEventRepository() { + @Override + public void close() throws IOException { + } + + @Override + public ProvenanceEventRecord getEvent(long eventId) throws IOException { + return null; + } + + @Override + public List<ProvenanceEventRecord> getEvents(long startEventId, int maxEvents) throws IOException { + return new ArrayList<>(); + } + + @Override + public Long getMaxEventId() { + return null; + } + + @Override + public List<SearchableField> getSearchableAttributes() { + return new ArrayList<>(); + } + + @Override + public List<SearchableField> getSearchableFields() { + return new ArrayList<>(); + } + + @Override + public void registerEvent(final ProvenanceEventRecord event) { + } + + @Override + public void registerEvents(final Iterable<ProvenanceEventRecord> events) { + } + + @Override + public ComputeLineageSubmission retrieveLineageSubmission(final String submissionId) { + return null; + } + + @Override + public QuerySubmission retrieveQuerySubmission(final String submissionId) { + return null; + } + + @Override + public ComputeLineageSubmission submitExpandChildren(final long eventId) { + return null; + } + + @Override + public ComputeLineageSubmission submitExpandParents(final long eventId) { + return null; + } + + @Override + public ComputeLineageSubmission submitLineageComputation(final String flowFileUuid) { + return null; + } + + @Override + public QuerySubmission submitQuery(final Query query) { + return null; + } + + @Override + public ProvenanceEventBuilder eventBuilder() { + return null; + } + + @Override + public void initialize(EventReporter eventReporter) throws IOException { + + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java new file mode 100644 index 0000000..e546f87 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.impl; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.attribute.expression.language.PreparedQuery; +import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.events.BulletinFactory; +import org.apache.nifi.processor.StandardPropertyValue; +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.Severity; + +public class ClusteredReportingContext implements ReportingContext { + + private final EventAccess eventAccess; + private final BulletinRepository bulletinRepository; + private final ControllerServiceProvider serviceProvider; + private final Map<PropertyDescriptor, String> properties; + private final Map<PropertyDescriptor, PreparedQuery> preparedQueries; + + public ClusteredReportingContext(final EventAccess eventAccess, final BulletinRepository bulletinRepository, + final Map<PropertyDescriptor, String> properties, final ControllerServiceProvider serviceProvider) { + this.eventAccess = eventAccess; + this.bulletinRepository = bulletinRepository; + this.properties = Collections.unmodifiableMap(properties); + this.serviceProvider = serviceProvider; + + preparedQueries = new HashMap<>(); + for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { + final PropertyDescriptor desc = entry.getKey(); + String value = entry.getValue(); + if (value == null) { + value = desc.getDefaultValue(); + } + + final PreparedQuery pq = Query.prepare(value); + preparedQueries.put(desc, pq); + } + } + + @Override + public EventAccess getEventAccess() { + return eventAccess; + } + + @Override + public BulletinRepository getBulletinRepository() { + return bulletinRepository; + } + + @Override + public Bulletin createBulletin(final String category, final Severity severity, final String message) { + return BulletinFactory.createBulletin(category, severity.name(), message); + } + + @Override + public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) { + final ProcessGroupStatus rootGroupStatus = eventAccess.getControllerStatus(); + final String groupId = findGroupId(rootGroupStatus, componentId); + final String componentName = findComponentName(rootGroupStatus, componentId); + + return BulletinFactory.createBulletin(groupId, componentId, componentName, category, severity.name(), message); + } + + @Override + public Map<PropertyDescriptor, String> getProperties() { + return Collections.unmodifiableMap(properties); + } + + @Override + public PropertyValue getProperty(final PropertyDescriptor property) { + final String configuredValue = properties.get(property); + return new StandardPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, serviceProvider, preparedQueries.get(property)); + } + + @Override + public ControllerServiceLookup getControllerServiceLookup() { + return serviceProvider; + } + + String findGroupId(final ProcessGroupStatus groupStatus, final String componentId) { + for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) { + if (procStatus.getId().equals(componentId)) { + return groupStatus.getId(); + } + } + + for (final PortStatus portStatus : groupStatus.getInputPortStatus()) { + if (portStatus.getId().equals(componentId)) { + return groupStatus.getId(); + } + } + + for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) { + if (portStatus.getId().equals(componentId)) { + return groupStatus.getId(); + } + } + + for (final ProcessGroupStatus childGroup : groupStatus.getProcessGroupStatus()) { + final String groupId = findGroupId(childGroup, componentId); + if (groupId != null) { + return groupId; + } + } + + return null; + } + + private String findComponentName(final ProcessGroupStatus groupStatus, final String componentId) { + for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) { + if (procStatus.getId().equals(componentId)) { + return procStatus.getName(); + } + } + + for (final PortStatus portStatus : groupStatus.getInputPortStatus()) { + if (portStatus.getId().equals(componentId)) { + return groupStatus.getName(); + } + } + + for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) { + if (portStatus.getId().equals(componentId)) { + return groupStatus.getName(); + } + } + + for (final ProcessGroupStatus childGroup : groupStatus.getProcessGroupStatus()) { + final String componentName = findComponentName(childGroup, componentId); + if (componentName != null) { + return componentName; + } + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java new file mode 100644 index 0000000..81bb7a7 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.impl; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter; +import com.sun.jersey.core.util.MultivaluedMapImpl; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; + +import org.apache.nifi.cluster.manager.HttpRequestReplicator; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.manager.exception.UriConstructionException; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.logging.NiFiLog; +import org.apache.nifi.util.FormatUtils; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of the <code>HttpRequestReplicator</code> interface. This + * implementation parallelizes the node HTTP requests using the given + * <code>ExecutorService</code> instance. Individual requests may have + * connection and read timeouts set, which may be set during instance + * construction. Otherwise, the default is not to timeout. + * + * If a node protocol scheme is provided during construction, then all requests + * will be replicated using the given scheme. If null is provided as the scheme + * (the default), then the requests will be replicated using the scheme of the + * original URI. + * + * Clients must call start() and stop() to initialize and shutdown the instance. + * The instance must be started before issuing any replication requests. + * + * @author unattributed + */ +public class HttpRequestReplicatorImpl implements HttpRequestReplicator { + + // defaults + private static final int DEFAULT_SHUTDOWN_REPLICATOR_SECONDS = 30; + + // logger + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(HttpRequestReplicatorImpl.class)); + + // final members + private final Client client; // the client to use for issuing requests + private final int numThreads; // number of threads to use for request replication + private final int connectionTimeoutMs; // connection timeout per node request + private final int readTimeoutMs; // read timeout per node request + + // members + private ExecutorService executorService; + private int shutdownReplicatorSeconds = DEFAULT_SHUTDOWN_REPLICATOR_SECONDS; + + // guarded by synchronized method access in support of multithreaded replication + private String nodeProtocolScheme = null; + + /** + * Creates an instance. The connection timeout and read timeout will be + * infinite. + * + * @param numThreads the number of threads to use when parallelizing + * requests + * @param client a client for making requests + */ + public HttpRequestReplicatorImpl(final int numThreads, final Client client) { + this(numThreads, client, "0 sec", "0 sec"); + } + + /** + * Creates an instance. + * + * @param numThreads the number of threads to use when parallelizing + * requests + * @param client a client for making requests + * @param connectionTimeoutMs the connection timeout specified in + * milliseconds + * @param readTimeoutMs the read timeout specified in milliseconds + */ + public HttpRequestReplicatorImpl(final int numThreads, final Client client, final String connectionTimeout, final String readTimeout) { + + if (numThreads <= 0) { + throw new IllegalArgumentException("The number of threads must be greater than zero."); + } else if (client == null) { + throw new IllegalArgumentException("Client may not be null."); + } + + this.numThreads = numThreads; + this.client = client; + this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS); + this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS); + + client.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, connectionTimeoutMs); + client.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, readTimeoutMs); + client.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, Boolean.TRUE); + } + + @Override + public void start() { + if (isRunning()) { + throw new IllegalStateException("Instance is already started."); + } + executorService = Executors.newFixedThreadPool(numThreads); + } + + @Override + public boolean isRunning() { + return executorService != null && !executorService.isShutdown(); + } + + @Override + public void stop() { + + if (!isRunning()) { + throw new IllegalStateException("Instance is already stopped."); + } + + // shutdown executor service + try { + if (getShutdownReplicatorSeconds() <= 0) { + executorService.shutdownNow(); + } else { + executorService.shutdown(); + } + executorService.awaitTermination(getShutdownReplicatorSeconds(), TimeUnit.SECONDS); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } finally { + if (executorService.isTerminated()) { + logger.info("HTTP Request Replicator has been terminated successfully."); + } else { + logger.warn("HTTP Request Replicator has not terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop."); + } + } + } + + /** + * Sets the protocol scheme to use when issuing requests to nodes. + * + * @param nodeProtocolScheme the scheme. Valid values are "http", "https", + * or null. If null is specified, then the scheme of the originating request + * is used when replicating that request. + */ + public synchronized void setNodeProtocolScheme(final String nodeProtocolScheme) { + if (StringUtils.isNotBlank(nodeProtocolScheme)) { + if (!"http".equalsIgnoreCase(nodeProtocolScheme) && !"https".equalsIgnoreCase(nodeProtocolScheme)) { + throw new IllegalArgumentException("Node Protocol Scheme must be either HTTP or HTTPS"); + } + } + this.nodeProtocolScheme = nodeProtocolScheme; + } + + public synchronized String getNodeProtocolScheme() { + return nodeProtocolScheme; + } + + private synchronized String getNodeProtocolScheme(final URI uri) { + // if we are not configured to use a protocol scheme, then use the uri's scheme + if (StringUtils.isBlank(nodeProtocolScheme)) { + return uri.getScheme(); + } + return nodeProtocolScheme; + } + + public int getConnectionTimeoutMs() { + return connectionTimeoutMs; + } + + public int getReadTimeoutMs() { + return readTimeoutMs; + } + + public int getShutdownReplicatorSeconds() { + return shutdownReplicatorSeconds; + } + + public void setShutdownReplicatorSeconds(int shutdownReplicatorSeconds) { + this.shutdownReplicatorSeconds = shutdownReplicatorSeconds; + } + + @Override + public Set<NodeResponse> replicate(final Set<NodeIdentifier> nodeIds, final String method, + final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers) + throws UriConstructionException { + if (nodeIds == null) { + throw new IllegalArgumentException("Node IDs may not be null."); + } else if (method == null) { + throw new IllegalArgumentException("HTTP method may not be null."); + } else if (uri == null) { + throw new IllegalArgumentException("URI may not be null."); + } else if (parameters == null) { + throw new IllegalArgumentException("Parameters may not be null."); + } else if (headers == null) { + throw new IllegalArgumentException("HTTP headers map may not be null."); + } + return replicateHelper(nodeIds, method, getNodeProtocolScheme(uri), uri.getPath(), parameters, /* entity */ null, headers); + } + + @Override + public Set<NodeResponse> replicate(final Set<NodeIdentifier> nodeIds, final String method, final URI uri, + final Object entity, final Map<String, String> headers) throws UriConstructionException { + if (nodeIds == null) { + throw new IllegalArgumentException("Node IDs may not be null."); + } else if (method == null) { + throw new IllegalArgumentException("HTTP method may not be null."); + } else if (method.equalsIgnoreCase(HttpMethod.DELETE) || method.equalsIgnoreCase(HttpMethod.GET) || method.equalsIgnoreCase(HttpMethod.HEAD) || method.equalsIgnoreCase(HttpMethod.OPTIONS)) { + throw new IllegalArgumentException("HTTP (DELETE | GET | HEAD | OPTIONS) requests cannot have a body containing an entity."); + } else if (uri == null) { + throw new IllegalArgumentException("URI may not be null."); + } else if (entity == null) { + throw new IllegalArgumentException("Entity may not be null."); + } else if (headers == null) { + throw new IllegalArgumentException("HTTP headers map may not be null."); + } + return replicateHelper(nodeIds, method, getNodeProtocolScheme(uri), uri.getPath(), /* parameters */ null, entity, headers); + } + + private Set<NodeResponse> replicateHelper(final Set<NodeIdentifier> nodeIds, final String method, final String scheme, + final String path, final Map<String, List<String>> parameters, final Object entity, final Map<String, String> headers) + throws UriConstructionException { + + if (nodeIds.isEmpty()) { + return new HashSet<>(); // return quickly for trivial case + } + + final CompletionService<NodeResponse> completionService = new ExecutorCompletionService<>(executorService); + + // keeps track of future requests so that failed requests can be tied back to the failing node + final Collection<NodeHttpRequestFutureWrapper> futureNodeHttpRequests = new ArrayList<>(); + + // construct the URIs for the nodes + final Map<NodeIdentifier, URI> uriMap = new HashMap<>(); + try { + for (final NodeIdentifier nodeId : nodeIds) { + final URI nodeUri = new URI(scheme, null, nodeId.getApiAddress(), nodeId.getApiPort(), path, /* query */ null, /* fragment */ null); + uriMap.put(nodeId, nodeUri); + } + } catch (final URISyntaxException use) { + throw new UriConstructionException(use); + } + + // submit the requests to the nodes + final String requestId = UUID.randomUUID().toString(); + headers.put(WebClusterManager.REQUEST_ID_HEADER, requestId); + for (final Map.Entry<NodeIdentifier, URI> entry : uriMap.entrySet()) { + final NodeIdentifier nodeId = entry.getKey(); + final URI nodeUri = entry.getValue(); + final NodeHttpRequestCallable callable = (entity == null) + ? new NodeHttpRequestCallable(nodeId, method, nodeUri, parameters, headers) + : new NodeHttpRequestCallable(nodeId, method, nodeUri, entity, headers); + futureNodeHttpRequests.add(new NodeHttpRequestFutureWrapper(nodeId, method, nodeUri, completionService.submit(callable))); + } + + // get the node responses + final Set<NodeResponse> result = new HashSet<>(); + for (int i = 0; i < nodeIds.size(); i++) { + + // keeps track of the original request information in case we receive an exception + NodeHttpRequestFutureWrapper futureNodeHttpRequest = null; + try { + + // get the future resource response for the node + final Future<NodeResponse> futureNodeResourceResponse = completionService.take(); + + // find the original request by comparing the submitted future with the future returned by the completion service + for (final NodeHttpRequestFutureWrapper futureNodeHttpRequestElem : futureNodeHttpRequests) { + if (futureNodeHttpRequestElem.getFuture() == futureNodeResourceResponse) { + futureNodeHttpRequest = futureNodeHttpRequestElem; + } + } + + // try to retrieve the node response and add to result + final NodeResponse nodeResponse = futureNodeResourceResponse.get(); + result.add(nodeResponse); + + } catch (final InterruptedException | ExecutionException ex) { + + logger.warn("Node request for " + futureNodeHttpRequest.getNodeId() + " encountered exception: " + ex, ex); + + // create node response with the thrown exception and add to result + final NodeResponse nodeResponse = new NodeResponse( + futureNodeHttpRequest.getNodeId(), futureNodeHttpRequest.getHttpMethod(), futureNodeHttpRequest.getRequestUri(), ex); + result.add(nodeResponse); + + } + } + + if (logger.isDebugEnabled()) { + NodeResponse min = null; + NodeResponse max = null; + long nanosSum = 0L; + int nanosAdded = 0; + + for (final NodeResponse response : result) { + final long requestNanos = response.getRequestDuration(TimeUnit.NANOSECONDS); + final long minNanos = (min == null) ? -1 : min.getRequestDuration(TimeUnit.NANOSECONDS); + final long maxNanos = (max == null) ? -1 : max.getRequestDuration(TimeUnit.NANOSECONDS); + + if (requestNanos < minNanos || minNanos < 0L) { + min = response; + } + + if (requestNanos > maxNanos || maxNanos < 0L) { + max = response; + } + + if (requestNanos >= 0L) { + nanosSum += requestNanos; + nanosAdded++; + } + } + + final StringBuilder sb = new StringBuilder(); + sb.append("Node Responses for ").append(method).append(" ").append(path).append(" (Request ID ").append(requestId).append("):\n"); + for (final NodeResponse response : result) { + sb.append(response).append("\n"); + } + + final long averageNanos = (nanosAdded == 0) ? -1L : nanosSum / nanosAdded; + final long averageMillis = (averageNanos < 0) ? averageNanos : TimeUnit.MILLISECONDS.convert(averageNanos, TimeUnit.NANOSECONDS); + logger.debug("For {} {} (Request ID {}), minimum response time = {}, max = {}, average = {} ms", + method, path, requestId, min, max, averageMillis); + logger.debug(sb.toString()); + } + + return result; + } + + /** + * Wraps a future node response with info from originating request. This + * coupling allows for futures that encountered exceptions to be linked back + * to the failing node and better reported. + */ + private class NodeHttpRequestFutureWrapper { + + private final NodeIdentifier nodeId; + + private final String httpMethod; + + private final URI requestUri; + + private final Future<NodeResponse> future; + + public NodeHttpRequestFutureWrapper(final NodeIdentifier nodeId, final String httpMethod, + final URI requestUri, final Future<NodeResponse> future) { + if (nodeId == null) { + throw new IllegalArgumentException("Node ID may not be null."); + } else if (StringUtils.isBlank(httpMethod)) { + throw new IllegalArgumentException("Http method may not be null or empty."); + } else if (requestUri == null) { + throw new IllegalArgumentException("Request URI may not be null."); + } else if (future == null) { + throw new IllegalArgumentException("Future may not be null."); + } + this.nodeId = nodeId; + this.httpMethod = httpMethod; + this.requestUri = requestUri; + this.future = future; + } + + public NodeIdentifier getNodeId() { + return nodeId; + } + + public String getHttpMethod() { + return httpMethod; + } + + public URI getRequestUri() { + return requestUri; + } + + public Future<NodeResponse> getFuture() { + return future; + } + } + + /** + * A Callable for making an HTTP request to a single node and returning its + * response. + */ + private class NodeHttpRequestCallable implements Callable<NodeResponse> { + + private final NodeIdentifier nodeId; + private final String method; + private final URI uri; + private final Object entity; + private final Map<String, List<String>> parameters = new HashMap<>(); + private final Map<String, String> headers = new HashMap<>(); + + private NodeHttpRequestCallable(final NodeIdentifier nodeId, final String method, + final URI uri, final Object entity, final Map<String, String> headers) { + this.nodeId = nodeId; + this.method = method; + this.uri = uri; + this.entity = entity; + this.headers.putAll(headers); + } + + private NodeHttpRequestCallable(final NodeIdentifier nodeId, final String method, + final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers) { + this.nodeId = nodeId; + this.method = method; + this.uri = uri; + this.entity = null; + this.parameters.putAll(parameters); + this.headers.putAll(headers); + } + + @Override + public NodeResponse call() { + + try { + // create and send the request + final WebResource.Builder resourceBuilder = getResourceBuilder(); + final String requestId = headers.get("x-nifi-request-id"); + + final long startNanos = System.nanoTime(); + final ClientResponse clientResponse; + if (HttpMethod.DELETE.equalsIgnoreCase(method)) { + clientResponse = resourceBuilder.delete(ClientResponse.class); + } else if (HttpMethod.GET.equalsIgnoreCase(method)) { + clientResponse = resourceBuilder.get(ClientResponse.class); + } else if (HttpMethod.HEAD.equalsIgnoreCase(method)) { + clientResponse = resourceBuilder.head(); + } else if (HttpMethod.OPTIONS.equalsIgnoreCase(method)) { + clientResponse = resourceBuilder.options(ClientResponse.class); + } else if (HttpMethod.POST.equalsIgnoreCase(method)) { + clientResponse = resourceBuilder.post(ClientResponse.class); + } else if (HttpMethod.PUT.equalsIgnoreCase(method)) { + clientResponse = resourceBuilder.put(ClientResponse.class); + } else { + throw new IllegalArgumentException("HTTP Method '" + method + "' not supported for request replication."); + } + + // create and return the response + return new NodeResponse(nodeId, method, uri, clientResponse, System.nanoTime() - startNanos, requestId); + + } catch (final UniformInterfaceException | IllegalArgumentException t) { + return new NodeResponse(nodeId, method, uri, t); + } + + } + + private WebResource.Builder getResourceBuilder() { + + // convert parameters to a more convenient data structure + final MultivaluedMap<String, String> map = new MultivaluedMapImpl(); + map.putAll(parameters); + + // create the resource + WebResource resource = client.resource(uri); + + if (WebClusterManager.isResponseInterpreted(uri, method)) { + resource.addFilter(new GZIPContentEncodingFilter(false)); + } + + // set the parameters as either query parameters or as request body + final WebResource.Builder builder; + if (HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.HEAD.equalsIgnoreCase(method) || HttpMethod.GET.equalsIgnoreCase(method) || HttpMethod.OPTIONS.equalsIgnoreCase(method)) { + resource = resource.queryParams(map); + builder = resource.getRequestBuilder(); + } else { + if (entity == null) { + builder = resource.entity(map); + } else { + builder = resource.entity(entity); + } + } + + // set headers + boolean foundContentType = false; + for (final Map.Entry<String, String> entry : headers.entrySet()) { + builder.header(entry.getKey(), entry.getValue()); + if (entry.getKey().equalsIgnoreCase("content-type")) { + foundContentType = true; + } + } + + // set default content type + if (!foundContentType) { + // set default content type + builder.type(MediaType.APPLICATION_FORM_URLENCODED); + } + + return builder; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java new file mode 100644 index 0000000..afade7e --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.manager.impl; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.nifi.cluster.manager.HttpResponseMapper; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.node.Node; +import org.apache.nifi.cluster.node.Node.Status; +import org.apache.nifi.logging.NiFiLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Determines the status of nodes based on their HTTP response codes. + * + * The algorithm is as follows. + * + * If any HTTP responses were 2XX, then disconnect non-2XX responses. This is + * because 2XX may have changed a node's flow. + * + * If no 2XX responses were received, then the node's flow has not changed. + * Instead of disconnecting everything, we only disconnect the nodes with + * internal errors, i.e., 5XX responses. + * + * @author unattributed + */ +public class HttpResponseMapperImpl implements HttpResponseMapper { + + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(HttpResponseMapperImpl.class)); + + @Override + public Map<NodeResponse, Status> map(final URI requestURI, final Set<NodeResponse> nodeResponses) { + + final Map<NodeResponse, Status> result = new HashMap<>(); + + // check if any responses were 2XX + boolean found2xx = false; + for (final NodeResponse nodeResponse : nodeResponses) { + if (nodeResponse.is2xx()) { + found2xx = true; + break; + } + } + + // determine the status of each node + for (final NodeResponse nodeResponse : nodeResponses) { + + final Node.Status status; + if (found2xx) { + // disconnect nodes with non-2XX responses + status = nodeResponse.is2xx() + ? Node.Status.CONNECTED + : Node.Status.DISCONNECTED; + } else { + // disconnect nodes with 5XX responses or exception + status = nodeResponse.is5xx() + ? Node.Status.DISCONNECTED + : Node.Status.CONNECTED; + } + + result.put(nodeResponse, status); + } + + return result; + } + +}