Repository: incubator-nifi Updated Branches: refs/heads/site-to-site-client [created] fdf758460
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java deleted file mode 100644 index d4b4f61..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ /dev/null @@ -1,510 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol.socket; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.zip.CRC32; -import java.util.zip.CheckedInputStream; -import java.util.zip.CheckedOutputStream; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.remote.Peer; -import org.apache.nifi.remote.PeerStatus; -import org.apache.nifi.remote.RemoteGroupPort; -import org.apache.nifi.remote.RemoteResourceFactory; -import org.apache.nifi.remote.StandardVersionNegotiator; -import org.apache.nifi.remote.VersionNegotiator; -import org.apache.nifi.remote.codec.FlowFileCodec; -import org.apache.nifi.remote.codec.StandardFlowFileCodec; -import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.ProtocolException; -import org.apache.nifi.remote.io.CompressionInputStream; -import org.apache.nifi.remote.io.CompressionOutputStream; -import org.apache.nifi.remote.protocol.ClientProtocol; -import org.apache.nifi.remote.protocol.CommunicationsSession; -import org.apache.nifi.remote.protocol.RequestType; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.StopWatch; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SocketClientProtocol implements ClientProtocol { - private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1); - - - private RemoteGroupPort port; - private boolean useCompression; - - private String commsIdentifier; - private boolean handshakeComplete = false; - - private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class); - - private Response handshakeResponse = null; - private boolean readyForFileTransfer = false; - private String transitUriPrefix = null; - - private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds - - public SocketClientProtocol() { - } - - public void setPort(final RemoteGroupPort port) { - this.port = port; - this.useCompression = port.isUseCompression(); - } - - @Override - public void handshake(final Peer peer) throws IOException, HandshakeException { - if ( handshakeComplete ) { - throw new IllegalStateException("Handshake has already been completed"); - } - commsIdentifier = UUID.randomUUID().toString(); - logger.debug("{} handshaking with {}", this, peer); - - final Map<HandshakeProperty, String> properties = new HashMap<>(); - properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression)); - properties.put(HandshakeProperty.PORT_IDENTIFIER, port.getIdentifier()); - properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf( - port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS)) ); - - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - commsSession.setTimeout(port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS)); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - - dos.writeUTF(commsIdentifier); - - if ( versionNegotiator.getVersion() >= 3 ) { - dos.writeUTF(peer.getUrl()); - transitUriPrefix = peer.getUrl(); - - if ( !transitUriPrefix.endsWith("/") ) { - transitUriPrefix = transitUriPrefix + "/"; - } - } - - dos.writeInt(properties.size()); - for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) { - dos.writeUTF(entry.getKey().name()); - dos.writeUTF(entry.getValue()); - } - - dos.flush(); - - try { - handshakeResponse = Response.read(dis); - } catch (final ProtocolException e) { - throw new HandshakeException(e); - } - - switch (handshakeResponse.getCode()) { - case PORT_NOT_IN_VALID_STATE: - case UNKNOWN_PORT: - case PORTS_DESTINATION_FULL: - break; - case PROPERTIES_OK: - readyForFileTransfer = true; - break; - default: - logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[] { - this, handshakeResponse, peer}); - peer.close(); - throw new HandshakeException("Received unexpected response " + handshakeResponse); - } - - logger.debug("{} Finished handshake with {}", this, peer); - handshakeComplete = true; - } - - public boolean isReadyForFileTransfer() { - return readyForFileTransfer; - } - - public boolean isPortInvalid() { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not completed successfully"); - } - return handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE; - } - - public boolean isPortUnknown() { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not completed successfully"); - } - return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT; - } - - public boolean isDestinationFull() { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not completed successfully"); - } - return handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL; - } - - @Override - public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not been performed"); - } - - logger.debug("{} Get Peer Statuses from {}", this, peer); - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - - RequestType.REQUEST_PEER_LIST.writeRequestType(dos); - dos.flush(); - final int numPeers = dis.readInt(); - final Set<PeerStatus> peers = new HashSet<>(numPeers); - for (int i=0; i < numPeers; i++) { - final String hostname = dis.readUTF(); - final int port = dis.readInt(); - final boolean secure = dis.readBoolean(); - final int flowFileCount = dis.readInt(); - peers.add(new PeerStatus(hostname, port, secure, flowFileCount)); - } - - logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer); - return peers; - } - - @Override - public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not been performed"); - } - - logger.debug("{} Negotiating Codec with {}", this, peer); - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - - RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos); - - FlowFileCodec codec = new StandardFlowFileCodec(); - try { - codec = (FlowFileCodec) RemoteResourceFactory.initiateResourceNegotiation(codec, dis, dos); - } catch (HandshakeException e) { - throw new ProtocolException(e.toString()); - } - logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] {this, codec, commsSession}); - - return codec; - } - - - @Override - public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not been performed"); - } - if ( !readyForFileTransfer ) { - throw new IllegalStateException("Cannot receive files; handshake resolution was " + handshakeResponse); - } - - logger.debug("{} Receiving FlowFiles from {}", this, peer); - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - String userDn = commsSession.getUserDn(); - if ( userDn == null ) { - userDn = "none"; - } - - // Indicate that we would like to have some data - RequestType.RECEIVE_FLOWFILES.writeRequestType(dos); - dos.flush(); - - // Determine if Peer will send us data or has no data to send us - final Response dataAvailableCode = Response.read(dis); - switch (dataAvailableCode.getCode()) { - case MORE_DATA: - logger.debug("{} {} Indicates that data is available", this, peer); - break; - case NO_MORE_DATA: - logger.debug("{} No data available from {}", peer); - return; - default: - throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); - } - - final StopWatch stopWatch = new StopWatch(true); - final Set<FlowFile> flowFilesReceived = new HashSet<>(); - long bytesReceived = 0L; - final CRC32 crc = new CRC32(); - - // Peer has data. Decode the bytes into FlowFiles until peer says he's finished sending data. - boolean continueTransaction = true; - String calculatedCRC = ""; - while (continueTransaction) { - final InputStream flowFileInputStream = useCompression ? new CompressionInputStream(dis) : dis; - final CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc); - - final long startNanos = System.nanoTime(); - FlowFile flowFile = codec.decode(checkedIn, session); - final long transmissionNanos = System.nanoTime() - startNanos; - final long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS); - - final String sourceFlowFileIdentifier = flowFile.getAttribute(CoreAttributes.UUID.key()); - flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString()); - - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier; - session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transmissionMillis); - - session.transfer(flowFile, Relationship.ANONYMOUS); - bytesReceived += flowFile.getSize(); - flowFilesReceived.add(flowFile); - logger.debug("{} Received {} from {}", this, flowFile, peer); - - final Response transactionCode = Response.read(dis); - switch (transactionCode.getCode()) { - case CONTINUE_TRANSACTION: - logger.trace("{} Received ContinueTransaction indicator from {}", this, peer); - break; - case FINISH_TRANSACTION: - logger.trace("{} Received FinishTransaction indicator from {}", this, peer); - continueTransaction = false; - calculatedCRC = String.valueOf(checkedIn.getChecksum().getValue()); - break; - default: - throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionCode); - } - } - - // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message - // to peer so that we can verify that the connection is still open. This is a two-phase commit, - // which helps to prevent the chances of data duplication. Without doing this, we may commit the - // session and then when we send the response back to the peer, the peer may have timed out and may not - // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the - // Critical Section involved in this transaction so that rather than the Critical Section being the - // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. - logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); - ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC); - - final Response confirmTransactionResponse = Response.read(dis); - logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer); - - switch (confirmTransactionResponse.getCode()) { - case CONFIRM_TRANSACTION: - break; - case BAD_CHECKSUM: - session.rollback(); - throw new IOException(this + " Received a BadChecksum response from peer " + peer); - default: - throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); - } - - // Commit the session so that we have persisted the data - session.commit(); - - if ( context.getAvailableRelationships().isEmpty() ) { - // Confirm that we received the data and the peer can now discard it but that the peer should not - // send any more data for a bit - logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); - ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos); - } else { - // Confirm that we received the data and the peer can now discard it - logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); - ResponseCode.TRANSACTION_FINISHED.writeResponse(dos); - } - - stopWatch.stop(); - final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; - final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); - final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - final String dataSize = FormatUtils.formatDataSize(bytesReceived); - logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { - this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); - } - - @Override - public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not been performed"); - } - if ( !readyForFileTransfer ) { - throw new IllegalStateException("Cannot transfer files; handshake resolution was " + handshakeResponse); - } - - FlowFile flowFile = session.get(); - if ( flowFile == null ) { - return; - } - - logger.debug("{} Sending FlowFiles to {}", this, peer); - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - String userDn = commsSession.getUserDn(); - if ( userDn == null ) { - userDn = "none"; - } - - // Indicate that we would like to have some data - RequestType.SEND_FLOWFILES.writeRequestType(dos); - dos.flush(); - - final StopWatch stopWatch = new StopWatch(true); - final CRC32 crc = new CRC32(); - - long bytesSent = 0L; - final Set<FlowFile> flowFilesSent = new HashSet<>(); - boolean continueTransaction = true; - String calculatedCRC = ""; - final long startSendingNanos = System.nanoTime(); - while (continueTransaction) { - final OutputStream flowFileOutputStream = useCompression ? new CompressionOutputStream(dos) : dos; - logger.debug("{} Sending {} to {}", this, flowFile, peer); - - final CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc); - - final long startNanos = System.nanoTime(); - flowFile = codec.encode(flowFile, session, checkedOutStream); - final long transferNanos = System.nanoTime() - startNanos; - final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); - - // need to close the CompressionOutputStream in order to force it write out any remaining bytes. - // Otherwise, do NOT close it because we don't want to close the underlying stream - // (CompressionOutputStream will not close the underlying stream when it's closed) - if ( useCompression ) { - checkedOutStream.close(); - } - - flowFilesSent.add(flowFile); - bytesSent += flowFile.getSize(); - logger.debug("{} Sent {} to {}", this, flowFile, peer); - - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); - session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false); - session.remove(flowFile); - - final long sendingNanos = System.nanoTime() - startSendingNanos; - if ( sendingNanos < BATCH_SEND_NANOS ) { - flowFile = session.get(); - } else { - flowFile = null; - } - - continueTransaction = (flowFile != null); - if ( continueTransaction ) { - logger.debug("{} Sent CONTINUE_TRANSACTION indicator to {}", this, peer); - ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos); - } else { - logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer); - ResponseCode.FINISH_TRANSACTION.writeResponse(dos); - - calculatedCRC = String.valueOf( checkedOutStream.getChecksum().getValue() ); - } - } - - // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response - final Response transactionConfirmationResponse = Response.read(dis); - if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) { - // Confirm checksum and echo back the confirmation. - logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer); - final String receivedCRC = transactionConfirmationResponse.getMessage(); - - if ( versionNegotiator.getVersion() > 3 ) { - if ( !receivedCRC.equals(calculatedCRC) ) { - ResponseCode.BAD_CHECKSUM.writeResponse(dos); - session.rollback(); - throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session"); - } - } - - ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, ""); - } else { - throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse); - } - - final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; - - final Response transactionResponse; - try { - transactionResponse = Response.read(dis); - } catch (final IOException e) { - logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." + - " It is unknown whether or not the peer successfully received/processed the data." + - " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", - this, peer, session, flowFileDescription); - session.rollback(); - throw e; - } - - logger.debug("{} Received {} from {}", this, transactionResponse, peer); - if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { - peer.penalize(port.getYieldPeriod(TimeUnit.MILLISECONDS)); - } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { - throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); - } - - // consume input stream entirely, ignoring its contents. If we - // don't do this, the Connection will not be returned to the pool - stopWatch.stop(); - final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); - final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - final String dataSize = FormatUtils.formatDataSize(bytesSent); - - session.commit(); - - logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] { - this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); - } - - @Override - public VersionNegotiator getVersionNegotiator() { - return versionNegotiator; - } - - @Override - public void shutdown(final Peer peer) throws IOException { - readyForFileTransfer = false; - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - - logger.debug("{} Shutting down with {}", this, peer); - // Indicate that we would like to have some data - RequestType.SHUTDOWN.writeRequestType(dos); - dos.flush(); - } - - @Override - public String getResourceName() { - return "SocketFlowFileProtocol"; - } - - @Override - public String toString() { - return "SocketClientProtocol[CommsID=" + commsIdentifier + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index 5edd4f9..647b45c 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -32,7 +32,6 @@ import java.util.zip.CRC32; import java.util.zip.CheckedInputStream; import java.util.zip.CheckedOutputStream; -import org.apache.nifi.cluster.NodeInformant; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Port; import org.apache.nifi.flowfile.FlowFile; @@ -47,6 +46,7 @@ import org.apache.nifi.remote.RemoteResourceFactory; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.StandardVersionNegotiator; import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.cluster.NodeInformant; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.ProtocolException; @@ -58,7 +58,6 @@ import org.apache.nifi.remote.protocol.ServerProtocol; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.StopWatch; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java index e074010..7474d38 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java @@ -18,14 +18,14 @@ package org.apache.nifi.remote; import org.apache.nifi.remote.StandardRemoteGroupPort; import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformation; + import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.nifi.cluster.ClusterNodeInformation; -import org.apache.nifi.cluster.NodeInformation; import org.apache.nifi.connectable.ConnectableType; - import org.junit.Test; public class TestStandardRemoteGroupPort { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/pom.xml b/nifi/nar-bundles/framework-bundle/pom.xml index b900c6d..b9368e8 100644 --- a/nifi/nar-bundles/framework-bundle/pom.xml +++ b/nifi/nar-bundles/framework-bundle/pom.xml @@ -109,6 +109,11 @@ <artifactId>web-optimistic-locking</artifactId> <version>0.0.1-incubating-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>site-to-site-client</artifactId> + <version>0.0.1-incubating-SNAPSHOT</version> + </dependency> </dependencies> </dependencyManagement> </project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java new file mode 100644 index 0000000..94de86b --- /dev/null +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +public interface RemoteDestination { + + String getDescription(); + + String getIdentifier(); + + URI getTargetUri(); + + boolean isSecure(); + + long getCommunicationsTimeout(TimeUnit timeUnit); + + long getYieldPeriod(TimeUnit timeUnit); + + boolean isUseCompression(); +}