http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java deleted file mode 100644 index 4afdfb7..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -import java.util.Map; -import java.util.Set; - -import org.apache.nifi.connectable.Port; -import org.apache.nifi.remote.exception.BadRequestException; -import org.apache.nifi.remote.exception.NotAuthorizedException; -import org.apache.nifi.remote.exception.RequestExpiredException; -import org.apache.nifi.remote.protocol.ServerProtocol; - -public interface RootGroupPort extends Port { - - boolean isTransmitting(); - - void setGroupAccessControl(Set<String> groups); - - Set<String> getGroupAccessControl(); - - void setUserAccessControl(Set<String> users); - - Set<String> getUserAccessControl(); - - /** - * Verifies that the specified user is authorized to interact with this port - * and returns a {@link PortAuthorizationResult} indicating why the user is - * unauthorized if this assumption fails - * - * @param dn - * @return - */ - PortAuthorizationResult checkUserAuthorization(String dn); - - /** - * Receives data from the given stream - * - * @param peer - * @param serverProtocol - * @param requestHeaders - * - * @return the number of FlowFiles received - * @throws org.apache.nifi.remote.exception.NotAuthorizedException - * @throws org.apache.nifi.remote.exception.BadRequestException - * @throws org.apache.nifi.remote.exception.RequestExpiredException - */ - int receiveFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException; - - /** - * Transfers data to the given stream - * - * @param peer - * @param requestHeaders - * @param serverProtocol - * - * @return the number of FlowFiles transferred - * @throws org.apache.nifi.remote.exception.NotAuthorizedException - * @throws org.apache.nifi.remote.exception.BadRequestException - * @throws org.apache.nifi.remote.exception.RequestExpiredException - */ - int transferFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException; - -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java deleted file mode 100644 index 56432d5..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -public enum TransferDirection { - - SEND, - RECEIVE; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java deleted file mode 100644 index bfccd98..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -public interface VersionedRemoteResource { - - VersionNegotiator getVersionNegotiator(); - - String getResourceName(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java deleted file mode 100644 index b4206b3..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.codec; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.remote.VersionedRemoteResource; -import org.apache.nifi.remote.exception.ProtocolException; -import org.apache.nifi.remote.exception.TransmissionDisabledException; - -/** - * <p> - * Provides a mechanism for encoding and decoding FlowFiles as streams so that - * they can be transferred remotely. - * </p> - */ -public interface FlowFileCodec extends VersionedRemoteResource { - - /** - * Returns a List of all versions that this codec is able to support, in the - * order that they are preferred by the codec - * - * @return - */ - public List<Integer> getSupportedVersions(); - - /** - * Encodes a FlowFile and its content as a single stream of data and writes - * that stream to the output. If checksum is not null, it will be calculated - * as the stream is read - * - * @param flowFile the FlowFile to encode - * @param session a session that can be used to transactionally create and - * transfer flow files - * @param outStream the stream to write the data to - * - * @return the updated FlowFile - * - * @throws IOException - */ - FlowFile encode(FlowFile flowFile, ProcessSession session, OutputStream outStream) throws IOException, TransmissionDisabledException; - - /** - * Decodes the contents of the InputStream, interpreting the data to - * determine the next FlowFile's attributes and content, as well as their - * destinations. If not null, checksum will be used to calculate the - * checksum as the data is read. - * - * @param stream an InputStream containing FlowFiles' contents, attributes, - * and destinations - * @param session - * - * @return the FlowFile that was created, or <code>null</code> if the stream - * was out of data - * - * @throws IOException - * @throws ProtocolException if the input is malformed - */ - FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java deleted file mode 100644 index f6c2f4f..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -public class BadRequestException extends Exception { - - private static final long serialVersionUID = -8034602852256106560L; - - public BadRequestException(final String message) { - super(message); - } - - public BadRequestException(final Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java deleted file mode 100644 index b61fc65..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -public class HandshakeException extends Exception { - - private static final long serialVersionUID = 178192341908726L; - - public HandshakeException(final String message) { - super(message); - } - - public HandshakeException(final Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java deleted file mode 100644 index 24ff3a5..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -public class NotAuthorizedException extends Exception { - - private static final long serialVersionUID = 2952623568114035498L; - - public NotAuthorizedException(final String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java deleted file mode 100644 index af0f467..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -public class PortNotRunningException extends Exception { - - private static final long serialVersionUID = -2790940982005516375L; - - public PortNotRunningException(final String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java deleted file mode 100644 index 0f50b98..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -public class ProtocolException extends Exception { - - private static final long serialVersionUID = 5763900324505818495L; - - public ProtocolException(final String message, final Throwable cause) { - super(message, cause); - } - - public ProtocolException(final String message) { - super(message); - } - - public ProtocolException(final Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java deleted file mode 100644 index dd675b3..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -/** - * Used to indicate that by the time the request was serviced, it had already - * expired - */ -public class RequestExpiredException extends Exception { - - private static final long serialVersionUID = -7037025330562827852L; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java deleted file mode 100644 index e6a0fe7..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -public class UnknownPortException extends Exception { - - private static final long serialVersionUID = -2790940982005516375L; - - public UnknownPortException(final String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java deleted file mode 100644 index 32274eb..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol; - -import java.io.IOException; -import java.util.Set; - -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.remote.Peer; -import org.apache.nifi.remote.PeerStatus; -import org.apache.nifi.remote.VersionedRemoteResource; -import org.apache.nifi.remote.codec.FlowFileCodec; -import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.PortNotRunningException; -import org.apache.nifi.remote.exception.ProtocolException; -import org.apache.nifi.remote.exception.UnknownPortException; - -public interface ClientProtocol extends VersionedRemoteResource { - - void handshake(Peer peer) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException; - - Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, ProtocolException; - - FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException; - - void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; - - void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; - - void shutdown(Peer peer) throws IOException, ProtocolException; - - boolean isReadyForFileTransfer(); - - /** - * returns <code>true</code> if remote instance indicates that the port is - * invalid - * - * @return - * @throws IllegalStateException if a handshake has not successfully - * completed - */ - boolean isPortInvalid() throws IllegalStateException; - - /** - * returns <code>true</code> if remote instance indicates that the port is - * unknown - * - * @return - * @throws IllegalStateException if a handshake has not successfully - * completed - */ - boolean isPortUnknown(); - - /** - * returns <code>true</code> if remote instance indicates that the port's - * destination is full - * - * @return - * @throws IllegalStateException if a handshake has not successfully - * completed - */ - boolean isDestinationFull(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java deleted file mode 100644 index d2e2946..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol; - -import java.io.IOException; -import java.io.InputStream; - -public interface CommunicationsInput { - - InputStream getInputStream() throws IOException; - - long getBytesRead(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java deleted file mode 100644 index 95cab29..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol; - -import java.io.IOException; -import java.io.OutputStream; - -public interface CommunicationsOutput { - - OutputStream getOutputStream() throws IOException; - - long getBytesWritten(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java deleted file mode 100644 index d009cec..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol; - -import java.io.Closeable; -import java.io.IOException; - -public interface CommunicationsSession extends Closeable { - - public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'}; - - CommunicationsInput getInput(); - - CommunicationsOutput getOutput(); - - void setTimeout(int millis) throws IOException; - - int getTimeout() throws IOException; - - void setUri(String uri); - - String getUri(); - - String getUserDn(); - - void setUserDn(String dn); - - boolean isDataAvailable(); - - long getBytesWritten(); - - long getBytesRead(); - - /** - * Asynchronously interrupts this FlowFileCodec. Implementations must ensure - * that they stop sending and receiving data as soon as possible after this - * method has been called, even if doing so results in sending only partial - * data to the peer. This will usually result in the peer throwing a - * SocketTimeoutException. - */ - void interrupt(); - - /** - * Returns <code>true</code> if the connection is closed, <code>false</code> - * otherwise. - * - * @return - */ - boolean isClosed(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java deleted file mode 100644 index 41334fe..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -public enum RequestType { - - NEGOTIATE_FLOWFILE_CODEC, - REQUEST_PEER_LIST, - SEND_FLOWFILES, - RECEIVE_FLOWFILES, - SHUTDOWN; - - public void writeRequestType(final DataOutputStream dos) throws IOException { - dos.writeUTF(name()); - } - - public static RequestType readRequestType(final DataInputStream dis) throws IOException { - final String requestTypeVal = dis.readUTF(); - try { - return RequestType.valueOf(requestTypeVal); - } catch (final Exception e) { - throw new IOException("Could not determine RequestType: received invalid value " + requestTypeVal); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java deleted file mode 100644 index 0d18f2e..0000000 --- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol; - -import java.io.IOException; - -import org.apache.nifi.cluster.NodeInformant; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.remote.Peer; -import org.apache.nifi.remote.RootGroupPort; -import org.apache.nifi.remote.VersionedRemoteResource; -import org.apache.nifi.remote.codec.FlowFileCodec; -import org.apache.nifi.remote.exception.BadRequestException; -import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.ProtocolException; - -public interface ServerProtocol extends VersionedRemoteResource { - - /** - * - * @param rootGroup - */ - void setRootProcessGroup(ProcessGroup rootGroup); - - RootGroupPort getPort(); - - /** - * Optional operation. Sets the NodeInformant to use in this Protocol, if a - * NodeInformant is supported. Otherwise, throws - * UnsupportedOperationException - * - * @param nodeInformant - */ - void setNodeInformant(NodeInformant nodeInformant); - - /** - * Receives the handshake from the Peer - * - * @param peer - * @throws IOException - * @throws HandshakeException - */ - void handshake(Peer peer) throws IOException, HandshakeException; - - /** - * Returns <code>true</code> if the handshaking process was completed - * successfully, <code>false</code> if either the handshaking process has - * not happened or the handshake failed - * - * @return - */ - boolean isHandshakeSuccessful(); - - /** - * Negotiates the FlowFileCodec that is to be used for transferring - * FlowFiles - * - * @param peer - * @return - * @throws IOException - * @throws BadRequestException - */ - FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException; - - /** - * Returns the codec that has already been negotiated by this Protocol, if - * any. - * - * @return - */ - FlowFileCodec getPreNegotiatedCodec(); - - /** - * Reads the Request Type of the next request from the Peer - * - * @return the RequestType that the peer would like to happen - or null, if - * no data available - */ - RequestType getRequestType(Peer peer) throws IOException; - - /** - * Sends FlowFiles to the specified peer - * - * @param peer - * @param context - * @param session - * @param codec - * - * @return the number of FlowFiles transferred - */ - int transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; - - /** - * Receives FlowFiles from the specified peer - * - * @param peer - * @param context - * @param session - * @param codec - * @throws IOException - * - * @return the number of FlowFiles received - * @throws ProtocolException - */ - int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; - - /** - * Returns the number of milliseconds after a request is received for which - * the request is still valid. A valid of 0 indicates that the request will - * not expire. - * - * @return - */ - long getRequestExpiration(); - - /** - * Sends a list of all nodes in the cluster to the specified peer. If not in - * a cluster, sends info about itself - * - * @param peer - */ - void sendPeerList(Peer peer) throws IOException; - - void shutdown(Peer peer); - - boolean isShutdown(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/.gitignore ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/.gitignore b/nar-bundles/framework-bundle/framework/core/.gitignore deleted file mode 100755 index ea8c4bf..0000000 --- a/nar-bundles/framework-bundle/framework/core/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/pom.xml b/nar-bundles/framework-bundle/framework/core/pom.xml deleted file mode 100644 index 47b52ea..0000000 --- a/nar-bundles/framework-bundle/framework/core/pom.xml +++ /dev/null @@ -1,121 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-parent</artifactId> - <version>0.0.1-SNAPSHOT</version> - </parent> - <artifactId>framework-core</artifactId> - <packaging>jar</packaging> - <name>NiFi Framework Core</name> - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>core-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-expression-language</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-properties</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>site-to-site</artifactId> - </dependency> - <dependency> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>framework-cluster-protocol</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-logging-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>client-dto</artifactId> - </dependency> - <dependency> - <groupId>org.quartz-scheduler</groupId> - <artifactId>quartz</artifactId> - </dependency> - <dependency> - <groupId>com.h2database</groupId> - <artifactId>h2</artifactId> - </dependency> - <dependency> - <groupId>org.jasypt</groupId> - <artifactId>jasypt</artifactId> - </dependency> - <dependency> - <groupId>org.bouncycastle</groupId> - <artifactId>bcprov-jdk16</artifactId> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-client</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-security</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-core</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-beans</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-context</artifactId> - </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>data-provenance-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>wali</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-mock</artifactId> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java deleted file mode 100644 index 1249657..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/BulletinsPayload.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Set; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.bind.Unmarshaller; -import javax.xml.bind.annotation.XmlRootElement; -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.jaxb.BulletinAdapter; -import org.apache.nifi.reporting.Bulletin; - -/** - * The payload of the bulletins. - * - * @author unattributed - */ -@XmlRootElement -public class BulletinsPayload { - - private static final JAXBContext JAXB_CONTEXT; - - static { - try { - JAXB_CONTEXT = JAXBContext.newInstance(BulletinsPayload.class); - } catch (JAXBException e) { - throw new RuntimeException("Unable to create JAXBContext."); - } - } - - private Set<Bulletin> bulletins; - - @XmlJavaTypeAdapter(BulletinAdapter.class) - public Set<Bulletin> getBulletins() { - return bulletins; - } - - public void setBulletins(final Set<Bulletin> bulletins) { - this.bulletins = bulletins; - } - - public byte[] marshal() throws ProtocolException { - final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream(); - marshal(this, payloadBytes); - return payloadBytes.toByteArray(); - } - - public static void marshal(final BulletinsPayload payload, final OutputStream os) throws ProtocolException { - try { - final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); - marshaller.marshal(payload, os); - } catch (final JAXBException je) { - throw new ProtocolException(je); - } - } - - public static BulletinsPayload unmarshal(final InputStream is) throws ProtocolException { - try { - final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); - return (BulletinsPayload) unmarshaller.unmarshal(is); - } catch (final JAXBException je) { - throw new ProtocolException(je); - } - } - - public static BulletinsPayload unmarshal(final byte[] bytes) throws ProtocolException { - try { - final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); - return (BulletinsPayload) unmarshaller.unmarshal(new ByteArrayInputStream(bytes)); - } catch (final JAXBException je) { - throw new ProtocolException(je); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java deleted file mode 100644 index 986e904..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/ConnectionException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -/** - * Represents the exceptional case when connection to the cluster fails. - * - * @author unattributed - */ -public class ConnectionException extends RuntimeException { - - private static final long serialVersionUID = -1378294897231234028L; - - public ConnectionException() { - } - - public ConnectionException(String msg) { - super(msg); - } - - public ConnectionException(Throwable cause) { - super(cause); - } - - public ConnectionException(String msg, Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java deleted file mode 100644 index 55707f3..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/DisconnectionException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -/** - * Represents the exceptional case when disconnection from the cluster fails. - * - * @author unattributed - */ -public class DisconnectionException extends RuntimeException { - - private static final long serialVersionUID = 6648876367997026125L; - - public DisconnectionException() { - } - - public DisconnectionException(String msg) { - super(msg); - } - - public DisconnectionException(Throwable cause) { - super(cause); - } - - public DisconnectionException(String msg, Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java deleted file mode 100644 index 093b238..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.bind.Unmarshaller; -import javax.xml.bind.annotation.XmlRootElement; -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.controller.Counter; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.diagnostics.SystemDiagnostics; -import org.apache.nifi.jaxb.CounterAdapter; - -/** - * The payload of the heartbeat. The payload contains status to inform the - * cluster manager the current workload of this node. - * - * @author unattributed - */ -@XmlRootElement -public class HeartbeatPayload { - - private static final JAXBContext JAXB_CONTEXT; - - static { - try { - JAXB_CONTEXT = JAXBContext.newInstance(HeartbeatPayload.class); - } catch (JAXBException e) { - throw new RuntimeException("Unable to create JAXBContext."); - } - } - - private List<Counter> counters; - private ProcessGroupStatus processGroupStatus; - private int activeThreadCount; - private long totalFlowFileCount; - private long totalFlowFileBytes; - private SystemDiagnostics systemDiagnostics; - private Integer siteToSitePort; - private boolean siteToSiteSecure; - private long systemStartTime; - - @XmlJavaTypeAdapter(CounterAdapter.class) - public List<Counter> getCounters() { - return counters; - } - - public void setCounters(final List<Counter> counters) { - this.counters = counters; - } - - public int getActiveThreadCount() { - return activeThreadCount; - } - - public void setActiveThreadCount(final int activeThreadCount) { - this.activeThreadCount = activeThreadCount; - } - - public long getTotalFlowFileCount() { - return totalFlowFileCount; - } - - public void setTotalFlowFileCount(final long totalFlowFileCount) { - this.totalFlowFileCount = totalFlowFileCount; - } - - public long getTotalFlowFileBytes() { - return totalFlowFileBytes; - } - - public void setTotalFlowFileBytes(final long totalFlowFileBytes) { - this.totalFlowFileBytes = totalFlowFileBytes; - } - - public ProcessGroupStatus getProcessGroupStatus() { - return processGroupStatus; - } - - public void setProcessGroupStatus(final ProcessGroupStatus processGroupStatus) { - this.processGroupStatus = processGroupStatus; - } - - public SystemDiagnostics getSystemDiagnostics() { - return systemDiagnostics; - } - - public void setSystemDiagnostics(final SystemDiagnostics systemDiagnostics) { - this.systemDiagnostics = systemDiagnostics; - } - - public boolean isSiteToSiteSecure() { - return siteToSiteSecure; - } - - public void setSiteToSiteSecure(final boolean secure) { - this.siteToSiteSecure = secure; - } - - public Integer getSiteToSitePort() { - return siteToSitePort; - } - - public void setSiteToSitePort(final Integer port) { - this.siteToSitePort = port; - } - - public long getSystemStartTime() { - return systemStartTime; - } - - public void setSystemStartTime(final long systemStartTime) { - this.systemStartTime = systemStartTime; - } - - public byte[] marshal() throws ProtocolException { - final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream(); - marshal(this, payloadBytes); - return payloadBytes.toByteArray(); - } - - public static void marshal(final HeartbeatPayload payload, final OutputStream os) throws ProtocolException { - try { - final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); - marshaller.marshal(payload, os); - } catch (final JAXBException je) { - throw new ProtocolException(je); - } - } - - public static HeartbeatPayload unmarshal(final InputStream is) throws ProtocolException { - try { - final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); - return (HeartbeatPayload) unmarshaller.unmarshal(is); - } catch (final JAXBException je) { - throw new ProtocolException(je); - } - } - - public static HeartbeatPayload unmarshal(final byte[] bytes) throws ProtocolException { - try { - final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); - return (HeartbeatPayload) unmarshaller.unmarshal(new ByteArrayInputStream(bytes)); - } catch (final JAXBException je) { - throw new ProtocolException(je); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java deleted file mode 100644 index f0739c2..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/LocalPort.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.connectable; - -import org.apache.nifi.connectable.ConnectableType; -import org.apache.nifi.connectable.Connection; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.AbstractPort; -import org.apache.nifi.controller.ProcessScheduler; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.scheduling.SchedulingStrategy; - -/** - * Provides a mechanism by which <code>FlowFile</code>s can be transferred into - * and out of a <code>ProcessGroup</code> to and/or from another - * <code>ProcessGroup</code> within the same instance of NiFi. - */ -public class LocalPort extends AbstractPort { - - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - private final Lock readLock = rwLock.readLock(); - private final Lock writeLock = rwLock.writeLock(); - - public LocalPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) { - super(id, name, processGroup, type, scheduler); - } - - @Override - public boolean isValid() { - return !getConnections(Relationship.ANONYMOUS).isEmpty(); - } - - @Override - public Collection<ValidationResult> getValidationErrors() { - final Collection<ValidationResult> validationErrors = new ArrayList<>(); - if (!isValid()) { - final ValidationResult error = new ValidationResult.Builder() - .explanation(String.format("Output connection for port '%s' is not defined.", getName())) - .subject(String.format("Port '%s'", getName())) - .valid(false) - .build(); - validationErrors.add(error); - } - return validationErrors; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - readLock.lock(); - try { - final List<FlowFile> flowFiles = session.get(10); - if (flowFiles.isEmpty()) { - context.yield(); - } else { - session.transfer(flowFiles, Relationship.ANONYMOUS); - } - } finally { - readLock.unlock(); - } - } - - @Override - public void updateConnection(final Connection connection) throws IllegalStateException { - writeLock.lock(); - try { - super.updateConnection(connection); - } finally { - writeLock.unlock(); - } - } - - @Override - public void addConnection(final Connection connection) throws IllegalArgumentException { - writeLock.lock(); - try { - super.addConnection(connection); - } finally { - writeLock.unlock(); - } - } - - @Override - public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException { - writeLock.lock(); - try { - super.removeConnection(connection); - } finally { - writeLock.unlock(); - } - } - - @Override - public Set<Connection> getConnections() { - readLock.lock(); - try { - return super.getConnections(); - } finally { - readLock.unlock(); - } - } - - @Override - public Set<Connection> getConnections(Relationship relationship) { - readLock.lock(); - try { - return super.getConnections(relationship); - } finally { - readLock.unlock(); - } - } - - @Override - public List<Connection> getIncomingConnections() { - readLock.lock(); - try { - return super.getIncomingConnections(); - } finally { - readLock.unlock(); - } - } - - @Override - public boolean hasIncomingConnection() { - readLock.lock(); - try { - return super.hasIncomingConnection(); - } finally { - readLock.unlock(); - } - } - - @Override - public boolean isTriggerWhenEmpty() { - return false; - } - - @Override - public SchedulingStrategy getSchedulingStrategy() { - return SchedulingStrategy.TIMER_DRIVEN; - } - - @Override - public boolean isSideEffectFree() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java deleted file mode 100644 index 1d723b5..0000000 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.connectable; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.nifi.controller.FlowFileQueue; -import org.apache.nifi.controller.ProcessScheduler; -import org.apache.nifi.controller.StandardFlowFileQueue; -import org.apache.nifi.controller.repository.FlowFileRecord; -import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.processor.FlowFileFilter; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.util.NiFiProperties; - -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; - -/** - * Models a connection between connectable components. A connection may contain - * one or more relationships that map the source component to the destination - * component. - */ -public final class StandardConnection implements Connection { - - private final String id; - private final AtomicReference<ProcessGroup> processGroup; - private final AtomicReference<String> name; - private final AtomicReference<List<Position>> bendPoints; - private final Connectable source; - private final AtomicReference<Connectable> destination; - private final AtomicReference<Collection<Relationship>> relationships; - private final StandardFlowFileQueue flowFileQueue; - private final AtomicInteger labelIndex = new AtomicInteger(1); - private final AtomicLong zIndex = new AtomicLong(0L); - private final ProcessScheduler scheduler; - private final int hashCode; - - private StandardConnection(final Builder builder) { - id = builder.id; - name = new AtomicReference<>(builder.name); - bendPoints = new AtomicReference<>(Collections.unmodifiableList(new ArrayList<>(builder.bendPoints))); - processGroup = new AtomicReference<>(builder.processGroup); - source = builder.source; - destination = new AtomicReference<>(builder.destination); - relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships)); - scheduler = builder.scheduler; - flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, NiFiProperties.getInstance().getQueueSwapThreshold()); - hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode(); - } - - public ProcessGroup getProcessGroup() { - return processGroup.get(); - } - - public String getIdentifier() { - return id; - } - - public String getName() { - return name.get(); - } - - public void setName(final String name) { - this.name.set(name); - } - - @Override - public List<Position> getBendPoints() { - return bendPoints.get(); - } - - @Override - public void setBendPoints(final List<Position> position) { - this.bendPoints.set(Collections.unmodifiableList(new ArrayList<>(position))); - } - - public int getLabelIndex() { - return labelIndex.get(); - } - - public void setLabelIndex(final int labelIndex) { - this.labelIndex.set(labelIndex); - } - - @Override - public long getZIndex() { - return zIndex.get(); - } - - @Override - public void setZIndex(final long zIndex) { - this.zIndex.set(zIndex); - } - - public Connectable getSource() { - return source; - } - - public Connectable getDestination() { - return destination.get(); - } - - public Collection<Relationship> getRelationships() { - return relationships.get(); - } - - public FlowFileQueue getFlowFileQueue() { - return flowFileQueue; - } - - public void setProcessGroup(final ProcessGroup newGroup) { - final ProcessGroup currentGroup = this.processGroup.get(); - try { - this.processGroup.set(newGroup); - } catch (final RuntimeException e) { - this.processGroup.set(currentGroup); - throw e; - } - } - - public void setRelationships(final Collection<Relationship> newRelationships) { - final Collection<Relationship> currentRelationships = relationships.get(); - if (currentRelationships.equals(newRelationships)) { - return; - } - - if (getSource().isRunning()) { - throw new IllegalStateException("Cannot update the relationships for Connection because the source of the Connection is running"); - } - - try { - this.relationships.set(new ArrayList<>(newRelationships)); - getSource().updateConnection(this); - } catch (final RuntimeException e) { - this.relationships.set(currentRelationships); - throw e; - } - } - - public void setDestination(final Connectable newDestination) { - final Connectable previousDestination = destination.get(); - if (previousDestination.equals(newDestination)) { - return; - } - - if (previousDestination.isRunning() && !(previousDestination instanceof Funnel || previousDestination instanceof LocalPort)) { - throw new IllegalStateException("Cannot change destination of Connection because the current destination is running"); - } - - try { - previousDestination.removeConnection(this); - this.destination.set(newDestination); - getSource().updateConnection(this); - - newDestination.addConnection(this); - scheduler.registerEvent(newDestination); - } catch (final RuntimeException e) { - this.destination.set(previousDestination); - throw e; - } - } - - @Override - public void lock() { - flowFileQueue.lock(); - } - - @Override - public void unlock() { - flowFileQueue.unlock(); - } - - @Override - public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) { - return flowFileQueue.poll(filter, expiredRecords); - } - - @Override - public boolean equals(final Object other) { - if (!(other instanceof Connection)) { - return false; - } - final Connection con = (Connection) other; - return new EqualsBuilder().append(id, con.getIdentifier()).isEquals(); - } - - @Override - public int hashCode() { - return hashCode; - } - - @Override - public String toString() { - return "Connection[ID=" + id + ",Name=" + name.get() + ",Source=" + getSource() + ",Destination=" + getDestination() + ",Relationships=" + getRelationships(); - } - - /** - * Gives this Connection ownership of the given FlowFile and allows the - * Connection to hold on to the FlowFile but NOT provide the FlowFile to - * consumers. This allows us to ensure that the Connection is not deleted - * during the middle of a Session commit. - * - * @param flowFile - */ - @Override - public void enqueue(final FlowFileRecord flowFile) { - flowFileQueue.put(flowFile); - } - - @Override - public void enqueue(final Collection<FlowFileRecord> flowFiles) { - flowFileQueue.putAll(flowFiles); - } - - public static class Builder { - - private final ProcessScheduler scheduler; - - private String id = UUID.randomUUID().toString(); - private String name; - private List<Position> bendPoints = new ArrayList<>(); - private ProcessGroup processGroup; - private Connectable source; - private Connectable destination; - private Collection<Relationship> relationships; - - public Builder(final ProcessScheduler scheduler) { - this.scheduler = scheduler; - } - - public Builder id(final String id) { - this.id = id; - return this; - } - - public Builder source(final Connectable source) { - this.source = source; - return this; - } - - public Builder processGroup(final ProcessGroup group) { - this.processGroup = group; - return this; - } - - public Builder destination(final Connectable destination) { - this.destination = destination; - return this; - } - - public Builder relationships(final Collection<Relationship> relationships) { - this.relationships = new ArrayList<>(relationships); - return this; - } - - public Builder name(final String name) { - this.name = name; - return this; - } - - public Builder bendPoints(final List<Position> bendPoints) { - this.bendPoints.clear(); - this.bendPoints.addAll(bendPoints); - return this; - } - - public Builder addBendPoint(final Position bendPoint) { - bendPoints.add(bendPoint); - return this; - } - - public StandardConnection build() { - if (source == null) { - throw new IllegalStateException("Cannot build a Connection without a Source"); - } - if (destination == null) { - throw new IllegalStateException("Cannot build a Connection without a Destination"); - } - - if (relationships == null) { - relationships = new ArrayList<>(); - } - - if (relationships.isEmpty()) { - // ensure relationships have been specified for processors, otherwise the anonymous relationship is used - if (source.getConnectableType() == ConnectableType.PROCESSOR) { - throw new IllegalStateException("Cannot build a Connection without any relationships"); - } - relationships.add(Relationship.ANONYMOUS); - } - - return new StandardConnection(this); - } - } - - @Override - public void verifyCanUpdate() { - // StandardConnection can always be updated - } - - @Override - public void verifyCanDelete() { - if (!flowFileQueue.isEmpty()) { - throw new IllegalStateException("Queue not empty for " + this); - } - - if (source.isRunning()) { - if (!ConnectableType.FUNNEL.equals(source.getConnectableType())) { - throw new IllegalStateException("Source of Connection (" + source + ") is running"); - } - } - } -}