http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java new file mode 100644 index 0000000..56432d5 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +public enum TransferDirection { + + SEND, + RECEIVE; +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java new file mode 100644 index 0000000..bfccd98 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +public interface VersionedRemoteResource { + + VersionNegotiator getVersionNegotiator(); + + String getResourceName(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java new file mode 100644 index 0000000..b4206b3 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.codec; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.remote.VersionedRemoteResource; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.TransmissionDisabledException; + +/** + * <p> + * Provides a mechanism for encoding and decoding FlowFiles as streams so that + * they can be transferred remotely. + * </p> + */ +public interface FlowFileCodec extends VersionedRemoteResource { + + /** + * Returns a List of all versions that this codec is able to support, in the + * order that they are preferred by the codec + * + * @return + */ + public List<Integer> getSupportedVersions(); + + /** + * Encodes a FlowFile and its content as a single stream of data and writes + * that stream to the output. If checksum is not null, it will be calculated + * as the stream is read + * + * @param flowFile the FlowFile to encode + * @param session a session that can be used to transactionally create and + * transfer flow files + * @param outStream the stream to write the data to + * + * @return the updated FlowFile + * + * @throws IOException + */ + FlowFile encode(FlowFile flowFile, ProcessSession session, OutputStream outStream) throws IOException, TransmissionDisabledException; + + /** + * Decodes the contents of the InputStream, interpreting the data to + * determine the next FlowFile's attributes and content, as well as their + * destinations. If not null, checksum will be used to calculate the + * checksum as the data is read. + * + * @param stream an InputStream containing FlowFiles' contents, attributes, + * and destinations + * @param session + * + * @return the FlowFile that was created, or <code>null</code> if the stream + * was out of data + * + * @throws IOException + * @throws ProtocolException if the input is malformed + */ + FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java new file mode 100644 index 0000000..f6c2f4f --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class BadRequestException extends Exception { + + private static final long serialVersionUID = -8034602852256106560L; + + public BadRequestException(final String message) { + super(message); + } + + public BadRequestException(final Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java new file mode 100644 index 0000000..b61fc65 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class HandshakeException extends Exception { + + private static final long serialVersionUID = 178192341908726L; + + public HandshakeException(final String message) { + super(message); + } + + public HandshakeException(final Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java new file mode 100644 index 0000000..24ff3a5 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class NotAuthorizedException extends Exception { + + private static final long serialVersionUID = 2952623568114035498L; + + public NotAuthorizedException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java new file mode 100644 index 0000000..af0f467 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class PortNotRunningException extends Exception { + + private static final long serialVersionUID = -2790940982005516375L; + + public PortNotRunningException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java new file mode 100644 index 0000000..0f50b98 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class ProtocolException extends Exception { + + private static final long serialVersionUID = 5763900324505818495L; + + public ProtocolException(final String message, final Throwable cause) { + super(message, cause); + } + + public ProtocolException(final String message) { + super(message); + } + + public ProtocolException(final Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java new file mode 100644 index 0000000..dd675b3 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +/** + * Used to indicate that by the time the request was serviced, it had already + * expired + */ +public class RequestExpiredException extends Exception { + + private static final long serialVersionUID = -7037025330562827852L; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java new file mode 100644 index 0000000..e6a0fe7 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class UnknownPortException extends Exception { + + private static final long serialVersionUID = -2790940982005516375L; + + public UnknownPortException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java new file mode 100644 index 0000000..32274eb --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.IOException; +import java.util.Set; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.VersionedRemoteResource; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.PortNotRunningException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.UnknownPortException; + +public interface ClientProtocol extends VersionedRemoteResource { + + void handshake(Peer peer) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException; + + Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, ProtocolException; + + FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException; + + void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; + + void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; + + void shutdown(Peer peer) throws IOException, ProtocolException; + + boolean isReadyForFileTransfer(); + + /** + * returns <code>true</code> if remote instance indicates that the port is + * invalid + * + * @return + * @throws IllegalStateException if a handshake has not successfully + * completed + */ + boolean isPortInvalid() throws IllegalStateException; + + /** + * returns <code>true</code> if remote instance indicates that the port is + * unknown + * + * @return + * @throws IllegalStateException if a handshake has not successfully + * completed + */ + boolean isPortUnknown(); + + /** + * returns <code>true</code> if remote instance indicates that the port's + * destination is full + * + * @return + * @throws IllegalStateException if a handshake has not successfully + * completed + */ + boolean isDestinationFull(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java new file mode 100644 index 0000000..d2e2946 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.IOException; +import java.io.InputStream; + +public interface CommunicationsInput { + + InputStream getInputStream() throws IOException; + + long getBytesRead(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java new file mode 100644 index 0000000..95cab29 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.IOException; +import java.io.OutputStream; + +public interface CommunicationsOutput { + + OutputStream getOutputStream() throws IOException; + + long getBytesWritten(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java new file mode 100644 index 0000000..d009cec --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.Closeable; +import java.io.IOException; + +public interface CommunicationsSession extends Closeable { + + public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'}; + + CommunicationsInput getInput(); + + CommunicationsOutput getOutput(); + + void setTimeout(int millis) throws IOException; + + int getTimeout() throws IOException; + + void setUri(String uri); + + String getUri(); + + String getUserDn(); + + void setUserDn(String dn); + + boolean isDataAvailable(); + + long getBytesWritten(); + + long getBytesRead(); + + /** + * Asynchronously interrupts this FlowFileCodec. Implementations must ensure + * that they stop sending and receiving data as soon as possible after this + * method has been called, even if doing so results in sending only partial + * data to the peer. This will usually result in the peer throwing a + * SocketTimeoutException. + */ + void interrupt(); + + /** + * Returns <code>true</code> if the connection is closed, <code>false</code> + * otherwise. + * + * @return + */ + boolean isClosed(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java new file mode 100644 index 0000000..41334fe --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public enum RequestType { + + NEGOTIATE_FLOWFILE_CODEC, + REQUEST_PEER_LIST, + SEND_FLOWFILES, + RECEIVE_FLOWFILES, + SHUTDOWN; + + public void writeRequestType(final DataOutputStream dos) throws IOException { + dos.writeUTF(name()); + } + + public static RequestType readRequestType(final DataInputStream dis) throws IOException { + final String requestTypeVal = dis.readUTF(); + try { + return RequestType.valueOf(requestTypeVal); + } catch (final Exception e) { + throw new IOException("Could not determine RequestType: received invalid value " + requestTypeVal); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java new file mode 100644 index 0000000..0d18f2e --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.IOException; + +import org.apache.nifi.cluster.NodeInformant; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.RootGroupPort; +import org.apache.nifi.remote.VersionedRemoteResource; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.BadRequestException; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.ProtocolException; + +public interface ServerProtocol extends VersionedRemoteResource { + + /** + * + * @param rootGroup + */ + void setRootProcessGroup(ProcessGroup rootGroup); + + RootGroupPort getPort(); + + /** + * Optional operation. Sets the NodeInformant to use in this Protocol, if a + * NodeInformant is supported. Otherwise, throws + * UnsupportedOperationException + * + * @param nodeInformant + */ + void setNodeInformant(NodeInformant nodeInformant); + + /** + * Receives the handshake from the Peer + * + * @param peer + * @throws IOException + * @throws HandshakeException + */ + void handshake(Peer peer) throws IOException, HandshakeException; + + /** + * Returns <code>true</code> if the handshaking process was completed + * successfully, <code>false</code> if either the handshaking process has + * not happened or the handshake failed + * + * @return + */ + boolean isHandshakeSuccessful(); + + /** + * Negotiates the FlowFileCodec that is to be used for transferring + * FlowFiles + * + * @param peer + * @return + * @throws IOException + * @throws BadRequestException + */ + FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException; + + /** + * Returns the codec that has already been negotiated by this Protocol, if + * any. + * + * @return + */ + FlowFileCodec getPreNegotiatedCodec(); + + /** + * Reads the Request Type of the next request from the Peer + * + * @return the RequestType that the peer would like to happen - or null, if + * no data available + */ + RequestType getRequestType(Peer peer) throws IOException; + + /** + * Sends FlowFiles to the specified peer + * + * @param peer + * @param context + * @param session + * @param codec + * + * @return the number of FlowFiles transferred + */ + int transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; + + /** + * Receives FlowFiles from the specified peer + * + * @param peer + * @param context + * @param session + * @param codec + * @throws IOException + * + * @return the number of FlowFiles received + * @throws ProtocolException + */ + int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; + + /** + * Returns the number of milliseconds after a request is received for which + * the request is still valid. A valid of 0 indicates that the request will + * not expire. + * + * @return + */ + long getRequestExpiration(); + + /** + * Sends a list of all nodes in the cluster to the specified peer. If not in + * a cluster, sends info about itself + * + * @param peer + */ + void sendPeerList(Peer peer) throws IOException; + + void shutdown(Peer peer); + + boolean isShutdown(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/.gitignore ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/.gitignore b/nar-bundles/framework-bundle/framework/core/.gitignore new file mode 100755 index 0000000..ea8c4bf --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/.gitignore @@ -0,0 +1 @@ +/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/pom.xml b/nar-bundles/framework-bundle/framework/core/pom.xml new file mode 100644 index 0000000..547c75d --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/pom.xml @@ -0,0 +1,130 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <artifactId>framework-core</artifactId> + <packaging>jar</packaging> + <name>NiFi Framework Core</name> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>core-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-expression-language</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-file-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-properties</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>site-to-site</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-core-flowfile-attributes</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>framework-cluster-protocol</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-logging-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>client-dto</artifactId> + </dependency> + <dependency> + <groupId>org.quartz-scheduler</groupId> + <artifactId>quartz</artifactId> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + </dependency> + <dependency> + <groupId>org.jasypt</groupId> + <artifactId>jasypt</artifactId> + </dependency> + <dependency> + <groupId>org.bouncycastle</groupId> + <artifactId>bcprov-jdk16</artifactId> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-security</artifactId> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-core</artifactId> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-beans</artifactId> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>data-provenance-utils</artifactId> + </dependency> + <dependency> + <groupId>wali</groupId> + <artifactId>wali</artifactId> + <version>3.0.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java new file mode 100644 index 0000000..1249657 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Set; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.jaxb.BulletinAdapter; +import org.apache.nifi.reporting.Bulletin; + +/** + * The payload of the bulletins. + * + * @author unattributed + */ +@XmlRootElement +public class BulletinsPayload { + + private static final JAXBContext JAXB_CONTEXT; + + static { + try { + JAXB_CONTEXT = JAXBContext.newInstance(BulletinsPayload.class); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXBContext."); + } + } + + private Set<Bulletin> bulletins; + + @XmlJavaTypeAdapter(BulletinAdapter.class) + public Set<Bulletin> getBulletins() { + return bulletins; + } + + public void setBulletins(final Set<Bulletin> bulletins) { + this.bulletins = bulletins; + } + + public byte[] marshal() throws ProtocolException { + final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream(); + marshal(this, payloadBytes); + return payloadBytes.toByteArray(); + } + + public static void marshal(final BulletinsPayload payload, final OutputStream os) throws ProtocolException { + try { + final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); + marshaller.marshal(payload, os); + } catch (final JAXBException je) { + throw new ProtocolException(je); + } + } + + public static BulletinsPayload unmarshal(final InputStream is) throws ProtocolException { + try { + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); + return (BulletinsPayload) unmarshaller.unmarshal(is); + } catch (final JAXBException je) { + throw new ProtocolException(je); + } + } + + public static BulletinsPayload unmarshal(final byte[] bytes) throws ProtocolException { + try { + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); + return (BulletinsPayload) unmarshaller.unmarshal(new ByteArrayInputStream(bytes)); + } catch (final JAXBException je) { + throw new ProtocolException(je); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java new file mode 100644 index 0000000..986e904 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster; + +/** + * Represents the exceptional case when connection to the cluster fails. + * + * @author unattributed + */ +public class ConnectionException extends RuntimeException { + + private static final long serialVersionUID = -1378294897231234028L; + + public ConnectionException() { + } + + public ConnectionException(String msg) { + super(msg); + } + + public ConnectionException(Throwable cause) { + super(cause); + } + + public ConnectionException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java new file mode 100644 index 0000000..55707f3 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster; + +/** + * Represents the exceptional case when disconnection from the cluster fails. + * + * @author unattributed + */ +public class DisconnectionException extends RuntimeException { + + private static final long serialVersionUID = 6648876367997026125L; + + public DisconnectionException() { + } + + public DisconnectionException(String msg) { + super(msg); + } + + public DisconnectionException(Throwable cause) { + super(cause); + } + + public DisconnectionException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java new file mode 100644 index 0000000..093b238 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.controller.Counter; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.diagnostics.SystemDiagnostics; +import org.apache.nifi.jaxb.CounterAdapter; + +/** + * The payload of the heartbeat. The payload contains status to inform the + * cluster manager the current workload of this node. + * + * @author unattributed + */ +@XmlRootElement +public class HeartbeatPayload { + + private static final JAXBContext JAXB_CONTEXT; + + static { + try { + JAXB_CONTEXT = JAXBContext.newInstance(HeartbeatPayload.class); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXBContext."); + } + } + + private List<Counter> counters; + private ProcessGroupStatus processGroupStatus; + private int activeThreadCount; + private long totalFlowFileCount; + private long totalFlowFileBytes; + private SystemDiagnostics systemDiagnostics; + private Integer siteToSitePort; + private boolean siteToSiteSecure; + private long systemStartTime; + + @XmlJavaTypeAdapter(CounterAdapter.class) + public List<Counter> getCounters() { + return counters; + } + + public void setCounters(final List<Counter> counters) { + this.counters = counters; + } + + public int getActiveThreadCount() { + return activeThreadCount; + } + + public void setActiveThreadCount(final int activeThreadCount) { + this.activeThreadCount = activeThreadCount; + } + + public long getTotalFlowFileCount() { + return totalFlowFileCount; + } + + public void setTotalFlowFileCount(final long totalFlowFileCount) { + this.totalFlowFileCount = totalFlowFileCount; + } + + public long getTotalFlowFileBytes() { + return totalFlowFileBytes; + } + + public void setTotalFlowFileBytes(final long totalFlowFileBytes) { + this.totalFlowFileBytes = totalFlowFileBytes; + } + + public ProcessGroupStatus getProcessGroupStatus() { + return processGroupStatus; + } + + public void setProcessGroupStatus(final ProcessGroupStatus processGroupStatus) { + this.processGroupStatus = processGroupStatus; + } + + public SystemDiagnostics getSystemDiagnostics() { + return systemDiagnostics; + } + + public void setSystemDiagnostics(final SystemDiagnostics systemDiagnostics) { + this.systemDiagnostics = systemDiagnostics; + } + + public boolean isSiteToSiteSecure() { + return siteToSiteSecure; + } + + public void setSiteToSiteSecure(final boolean secure) { + this.siteToSiteSecure = secure; + } + + public Integer getSiteToSitePort() { + return siteToSitePort; + } + + public void setSiteToSitePort(final Integer port) { + this.siteToSitePort = port; + } + + public long getSystemStartTime() { + return systemStartTime; + } + + public void setSystemStartTime(final long systemStartTime) { + this.systemStartTime = systemStartTime; + } + + public byte[] marshal() throws ProtocolException { + final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream(); + marshal(this, payloadBytes); + return payloadBytes.toByteArray(); + } + + public static void marshal(final HeartbeatPayload payload, final OutputStream os) throws ProtocolException { + try { + final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); + marshaller.marshal(payload, os); + } catch (final JAXBException je) { + throw new ProtocolException(je); + } + } + + public static HeartbeatPayload unmarshal(final InputStream is) throws ProtocolException { + try { + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); + return (HeartbeatPayload) unmarshaller.unmarshal(is); + } catch (final JAXBException je) { + throw new ProtocolException(je); + } + } + + public static HeartbeatPayload unmarshal(final byte[] bytes) throws ProtocolException { + try { + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); + return (HeartbeatPayload) unmarshaller.unmarshal(new ByteArrayInputStream(bytes)); + } catch (final JAXBException je) { + throw new ProtocolException(je); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java new file mode 100644 index 0000000..1efa0cd --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.connectable; + +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.connectable.Connection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractPort; +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.scheduling.SchedulingStrategy; + +/** + * Provides a mechanism by which <code>FlowFile</code>s can be transferred into + * and out of a <code>ProcessGroup</code> to and/or from another + * <code>ProcessGroup</code> within the same instance of NiFi. + */ +public class LocalPort extends AbstractPort { + + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + public LocalPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) { + super(id, name, processGroup, type, scheduler); + } + + @Override + public boolean isValid() { + return !getConnections(Relationship.ANONYMOUS).isEmpty(); + } + + @Override + public Collection<ValidationResult> getValidationErrors() { + final Collection<ValidationResult> validationErrors = new ArrayList<>(); + if (!isValid()) { + final ValidationResult error = new ValidationResult.Builder() + .explanation(String.format("Output connection for port '%s' is not defined.", getName())) + .subject(String.format("Port '%s'", getName())) + .valid(false) + .build(); + validationErrors.add(error); + } + return validationErrors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + readLock.lock(); + try { + final List<FlowFile> flowFiles = session.get(10); + if (flowFiles.isEmpty()) { + context.yield(); + } else { + session.transfer(flowFiles, Relationship.ANONYMOUS); + } + } finally { + readLock.unlock(); + } + } + + @Override + public void updateConnection(final Connection connection) throws IllegalStateException { + writeLock.lock(); + try { + super.updateConnection(connection); + } finally { + writeLock.unlock(); + } + } + + @Override + public void addConnection(final Connection connection) throws IllegalArgumentException { + writeLock.lock(); + try { + super.addConnection(connection); + } finally { + writeLock.unlock(); + } + } + + @Override + public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException { + writeLock.lock(); + try { + super.removeConnection(connection); + } finally { + writeLock.unlock(); + } + } + + @Override + public Set<Connection> getConnections() { + readLock.lock(); + try { + return super.getConnections(); + } finally { + readLock.unlock(); + } + } + + @Override + public Set<Connection> getConnections(Relationship relationship) { + readLock.lock(); + try { + return super.getConnections(relationship); + } finally { + readLock.unlock(); + } + } + + @Override + public List<Connection> getIncomingConnections() { + readLock.lock(); + try { + return super.getIncomingConnections(); + } finally { + readLock.unlock(); + } + } + + @Override + public boolean hasIncomingConnection() { + readLock.lock(); + try { + return super.hasIncomingConnection(); + } finally { + readLock.unlock(); + } + } + + @Override + public boolean isTriggerWhenEmpty() { + return false; + } + + @Override + public SchedulingStrategy getSchedulingStrategy() { + return SchedulingStrategy.EVENT_DRIVEN; + } + + @Override + public boolean isSideEffectFree() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java new file mode 100644 index 0000000..1d723b5 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.connectable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.controller.FlowFileQueue; +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.StandardFlowFileQueue; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.NiFiProperties; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +/** + * Models a connection between connectable components. A connection may contain + * one or more relationships that map the source component to the destination + * component. + */ +public final class StandardConnection implements Connection { + + private final String id; + private final AtomicReference<ProcessGroup> processGroup; + private final AtomicReference<String> name; + private final AtomicReference<List<Position>> bendPoints; + private final Connectable source; + private final AtomicReference<Connectable> destination; + private final AtomicReference<Collection<Relationship>> relationships; + private final StandardFlowFileQueue flowFileQueue; + private final AtomicInteger labelIndex = new AtomicInteger(1); + private final AtomicLong zIndex = new AtomicLong(0L); + private final ProcessScheduler scheduler; + private final int hashCode; + + private StandardConnection(final Builder builder) { + id = builder.id; + name = new AtomicReference<>(builder.name); + bendPoints = new AtomicReference<>(Collections.unmodifiableList(new ArrayList<>(builder.bendPoints))); + processGroup = new AtomicReference<>(builder.processGroup); + source = builder.source; + destination = new AtomicReference<>(builder.destination); + relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships)); + scheduler = builder.scheduler; + flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, NiFiProperties.getInstance().getQueueSwapThreshold()); + hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode(); + } + + public ProcessGroup getProcessGroup() { + return processGroup.get(); + } + + public String getIdentifier() { + return id; + } + + public String getName() { + return name.get(); + } + + public void setName(final String name) { + this.name.set(name); + } + + @Override + public List<Position> getBendPoints() { + return bendPoints.get(); + } + + @Override + public void setBendPoints(final List<Position> position) { + this.bendPoints.set(Collections.unmodifiableList(new ArrayList<>(position))); + } + + public int getLabelIndex() { + return labelIndex.get(); + } + + public void setLabelIndex(final int labelIndex) { + this.labelIndex.set(labelIndex); + } + + @Override + public long getZIndex() { + return zIndex.get(); + } + + @Override + public void setZIndex(final long zIndex) { + this.zIndex.set(zIndex); + } + + public Connectable getSource() { + return source; + } + + public Connectable getDestination() { + return destination.get(); + } + + public Collection<Relationship> getRelationships() { + return relationships.get(); + } + + public FlowFileQueue getFlowFileQueue() { + return flowFileQueue; + } + + public void setProcessGroup(final ProcessGroup newGroup) { + final ProcessGroup currentGroup = this.processGroup.get(); + try { + this.processGroup.set(newGroup); + } catch (final RuntimeException e) { + this.processGroup.set(currentGroup); + throw e; + } + } + + public void setRelationships(final Collection<Relationship> newRelationships) { + final Collection<Relationship> currentRelationships = relationships.get(); + if (currentRelationships.equals(newRelationships)) { + return; + } + + if (getSource().isRunning()) { + throw new IllegalStateException("Cannot update the relationships for Connection because the source of the Connection is running"); + } + + try { + this.relationships.set(new ArrayList<>(newRelationships)); + getSource().updateConnection(this); + } catch (final RuntimeException e) { + this.relationships.set(currentRelationships); + throw e; + } + } + + public void setDestination(final Connectable newDestination) { + final Connectable previousDestination = destination.get(); + if (previousDestination.equals(newDestination)) { + return; + } + + if (previousDestination.isRunning() && !(previousDestination instanceof Funnel || previousDestination instanceof LocalPort)) { + throw new IllegalStateException("Cannot change destination of Connection because the current destination is running"); + } + + try { + previousDestination.removeConnection(this); + this.destination.set(newDestination); + getSource().updateConnection(this); + + newDestination.addConnection(this); + scheduler.registerEvent(newDestination); + } catch (final RuntimeException e) { + this.destination.set(previousDestination); + throw e; + } + } + + @Override + public void lock() { + flowFileQueue.lock(); + } + + @Override + public void unlock() { + flowFileQueue.unlock(); + } + + @Override + public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) { + return flowFileQueue.poll(filter, expiredRecords); + } + + @Override + public boolean equals(final Object other) { + if (!(other instanceof Connection)) { + return false; + } + final Connection con = (Connection) other; + return new EqualsBuilder().append(id, con.getIdentifier()).isEquals(); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public String toString() { + return "Connection[ID=" + id + ",Name=" + name.get() + ",Source=" + getSource() + ",Destination=" + getDestination() + ",Relationships=" + getRelationships(); + } + + /** + * Gives this Connection ownership of the given FlowFile and allows the + * Connection to hold on to the FlowFile but NOT provide the FlowFile to + * consumers. This allows us to ensure that the Connection is not deleted + * during the middle of a Session commit. + * + * @param flowFile + */ + @Override + public void enqueue(final FlowFileRecord flowFile) { + flowFileQueue.put(flowFile); + } + + @Override + public void enqueue(final Collection<FlowFileRecord> flowFiles) { + flowFileQueue.putAll(flowFiles); + } + + public static class Builder { + + private final ProcessScheduler scheduler; + + private String id = UUID.randomUUID().toString(); + private String name; + private List<Position> bendPoints = new ArrayList<>(); + private ProcessGroup processGroup; + private Connectable source; + private Connectable destination; + private Collection<Relationship> relationships; + + public Builder(final ProcessScheduler scheduler) { + this.scheduler = scheduler; + } + + public Builder id(final String id) { + this.id = id; + return this; + } + + public Builder source(final Connectable source) { + this.source = source; + return this; + } + + public Builder processGroup(final ProcessGroup group) { + this.processGroup = group; + return this; + } + + public Builder destination(final Connectable destination) { + this.destination = destination; + return this; + } + + public Builder relationships(final Collection<Relationship> relationships) { + this.relationships = new ArrayList<>(relationships); + return this; + } + + public Builder name(final String name) { + this.name = name; + return this; + } + + public Builder bendPoints(final List<Position> bendPoints) { + this.bendPoints.clear(); + this.bendPoints.addAll(bendPoints); + return this; + } + + public Builder addBendPoint(final Position bendPoint) { + bendPoints.add(bendPoint); + return this; + } + + public StandardConnection build() { + if (source == null) { + throw new IllegalStateException("Cannot build a Connection without a Source"); + } + if (destination == null) { + throw new IllegalStateException("Cannot build a Connection without a Destination"); + } + + if (relationships == null) { + relationships = new ArrayList<>(); + } + + if (relationships.isEmpty()) { + // ensure relationships have been specified for processors, otherwise the anonymous relationship is used + if (source.getConnectableType() == ConnectableType.PROCESSOR) { + throw new IllegalStateException("Cannot build a Connection without any relationships"); + } + relationships.add(Relationship.ANONYMOUS); + } + + return new StandardConnection(this); + } + } + + @Override + public void verifyCanUpdate() { + // StandardConnection can always be updated + } + + @Override + public void verifyCanDelete() { + if (!flowFileQueue.isEmpty()) { + throw new IllegalStateException("Queue not empty for " + this); + } + + if (source.isRunning()) { + if (!ConnectableType.FUNNEL.equals(source.getConnectableType())) { + throw new IllegalStateException("Source of Connection (" + source + ") is running"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java new file mode 100644 index 0000000..f36a459 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.Connectables; + +public class EventDrivenWorkerQueue implements WorkerQueue { + + private final Object workMonitor = new Object(); + + private final Map<Connectable, Worker> workerMap = new HashMap<>(); // protected by synchronizing on workMonitor + private final WorkerReadyQueue workerQueue; + + public EventDrivenWorkerQueue(final boolean clustered, final boolean primary, final ProcessScheduler scheduler) { + workerQueue = new WorkerReadyQueue(scheduler); + workerQueue.setClustered(clustered); + workerQueue.setPrimary(primary); + } + + @Override + public void setClustered(final boolean clustered) { + workerQueue.setClustered(clustered); + } + + @Override + public void setPrimary(final boolean primary) { + workerQueue.setPrimary(primary); + } + + @Override + public Worker poll(final long timeout, final TimeUnit timeUnit) { + final long maxTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeout, timeUnit); + while (System.currentTimeMillis() < maxTime) { + synchronized (workMonitor) { + final Worker worker = workerQueue.poll(); + if (worker == null) { + // nothing to do. wait until we have something to do. + final long timeLeft = maxTime - System.currentTimeMillis(); + if (timeLeft <= 0) { + return null; + } + + try { + workMonitor.wait(timeLeft); + } catch (final InterruptedException ignored) { + } + } else { + // Decrement the amount of work there is to do for this worker. + final int workLeft = worker.decrementEventCount(); + if (workLeft > 0) { + workerQueue.offer(worker); + } + + return worker; + } + } + } + + return null; + } + + @Override + public void offer(final Connectable connectable) { + synchronized (workMonitor) { + Worker worker = workerMap.get(connectable); + if (worker == null) { + // if worker is null, then it has not been scheduled to run; ignore the event. + return; + } + + final int countBefore = worker.incrementEventCount(); + if (countBefore < 0) { + worker.setWorkCount(1); + } + if (countBefore <= 0) { + // If countBefore > 0 then it's already on the queue, so just incrementing its counter is sufficient. + workerQueue.offer(worker); + } + + workMonitor.notify(); + } + } + + private int getWorkCount(final Connectable connectable) { + int sum = 0; + for (final Connection connection : connectable.getIncomingConnections()) { + sum += connection.getFlowFileQueue().size().getObjectCount(); + } + return sum; + } + + @Override + public void resumeWork(final Connectable connectable) { + synchronized (workMonitor) { + final int workCount = getWorkCount(connectable); + final Worker worker = new Worker(connectable); + workerMap.put(connectable, worker); + + if (workCount > 0) { + worker.setWorkCount(workCount); + workerQueue.offer(worker); + workMonitor.notify(); + } + } + } + + @Override + public void suspendWork(final Connectable connectable) { + synchronized (workMonitor) { + final Worker worker = this.workerMap.remove(connectable); + if (worker == null) { + return; + } + + worker.resetWorkCount(); + workerQueue.remove(worker); + } + } + + public static class Worker implements EventBasedWorker { + + private final Connectable connectable; + private final AtomicInteger workCount = new AtomicInteger(0); + + public Worker(final Connectable connectable) { + this.connectable = connectable; + } + + @Override + public Connectable getConnectable() { + return connectable; + } + + @Override + public int decrementEventCount() { + return workCount.decrementAndGet(); + } + + @Override + public int incrementEventCount() { + return workCount.getAndIncrement(); + } + + void resetWorkCount() { + workCount.set(0); + } + + void setWorkCount(final int workCount) { + this.workCount.set(workCount); + } + } + + @SuppressWarnings("serial") + private static class WorkerReadyQueue extends LinkedList<Worker> { + + private final ProcessScheduler scheduler; + + private volatile boolean clustered = false; + private volatile boolean primary = false; + + public WorkerReadyQueue(final ProcessScheduler scheduler) { + this.scheduler = scheduler; + } + + public void setClustered(final boolean clustered) { + this.clustered = clustered; + } + + public void setPrimary(final boolean primary) { + this.primary = primary; + } + + @Override + public Worker poll() { + final List<Worker> putBack = new ArrayList<>(); + + Worker worker; + try { + while ((worker = super.poll()) != null) { + final DelayProcessingReason reason = getDelayReason(worker); + if (reason == null) { + return worker; + } else { + // Worker is not ready. We may want to add him back to the queue, depending on the reason that he is unready. + switch (reason) { + case YIELDED: + case ISOLATED: + case DESTINATION_FULL: + case ALL_WORK_PENALIZED: + case NO_WORK: + case TOO_MANY_THREADS: + // there will not be an event that triggers this to happen, so we add this worker back to the queue. + putBack.add(worker); + break; + default: + case NOT_RUNNING: + // There's no need to check if this worker is available again until a another event + // occurs. Therefore, we keep him off of the queue and reset his work count + worker.resetWorkCount(); + break; + } + } + } + } finally { + if (!putBack.isEmpty()) { + super.addAll(putBack); + } + } + + return null; + } + + private DelayProcessingReason getDelayReason(final Worker worker) { + final Connectable connectable = worker.getConnectable(); + + if (ScheduledState.RUNNING != connectable.getScheduledState()) { + return DelayProcessingReason.NOT_RUNNING; + } + + if (connectable.getYieldExpiration() > System.currentTimeMillis()) { + return DelayProcessingReason.YIELDED; + } + + // For Remote Output Ports, + int availableRelationshipCount = 0; + if (!connectable.getRelationships().isEmpty()) { + availableRelationshipCount = getAvailableRelationshipCount(connectable); + + if (availableRelationshipCount == 0) { + return DelayProcessingReason.DESTINATION_FULL; + } + } + + if (connectable.hasIncomingConnection() && !Connectables.flowFilesQueued(connectable)) { + return DelayProcessingReason.NO_WORK; + } + + final int activeThreadCount = scheduler.getActiveThreadCount(worker.getConnectable()); + final int maxThreadCount = worker.getConnectable().getMaxConcurrentTasks(); + if (maxThreadCount > 0 && activeThreadCount >= maxThreadCount) { + return DelayProcessingReason.TOO_MANY_THREADS; + } + + if (connectable instanceof ProcessorNode) { + final ProcessorNode procNode = (ProcessorNode) connectable; + if (procNode.isIsolated() && clustered && !primary) { + return DelayProcessingReason.ISOLATED; + } + + final boolean triggerWhenAnyAvailable = procNode.isTriggerWhenAnyDestinationAvailable(); + final boolean allDestinationsAvailable = availableRelationshipCount == procNode.getRelationships().size(); + if (!triggerWhenAnyAvailable && !allDestinationsAvailable) { + return DelayProcessingReason.DESTINATION_FULL; + } + } + + return null; + } + + private int getAvailableRelationshipCount(final Connectable connectable) { + int count = 0; + for (final Relationship relationship : connectable.getRelationships()) { + final Collection<Connection> connections = connectable.getConnections(relationship); + + if (connections == null || connections.isEmpty()) { + if (connectable.isAutoTerminated(relationship)) { + // If the relationship is auto-terminated, consider it available. + count++; + } + } else { + boolean available = true; + for (final Connection connection : connections) { + if (connection.getSource() == connection.getDestination()) { + // don't count self-loops + continue; + } + + if (connection.getFlowFileQueue().isFull()) { + available = false; + } + } + + if (available) { + count++; + } + } + } + + return count; + } + } + + private static enum DelayProcessingReason { + + YIELDED, + DESTINATION_FULL, + NO_WORK, + ALL_WORK_PENALIZED, + ISOLATED, + NOT_RUNNING, + TOO_MANY_THREADS; + } +}