http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java index ec169ad..cb2d76d 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java @@ -35,8 +35,8 @@ import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLContext; -import org.apache.nifi.cluster.NodeInformant; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.remote.cluster.NodeInformant; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java index b0d88d4..53f998e 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java @@ -18,6 +18,7 @@ package org.apache.nifi.remote; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; @@ -44,18 +45,20 @@ import javax.net.ssl.SSLContext; import javax.security.cert.CertificateExpiredException; import javax.security.cert.CertificateNotYetValidException; -import org.apache.nifi.cluster.ClusterNodeInformation; -import org.apache.nifi.cluster.NodeInformation; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; -import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.remote.client.socket.EndpointConnectionState; +import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.BadRequestException; import org.apache.nifi.remote.exception.HandshakeException; @@ -71,35 +74,28 @@ import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; - +import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sun.jersey.api.client.ClientHandlerException; -public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroupPort { +public class StandardRemoteGroupPort extends RemoteGroupPort { public static final String USER_AGENT = "NiFi-Site-to-Site"; public static final String CONTENT_TYPE = "application/octet-stream"; public static final int GZIP_COMPRESSION_LEVEL = 1; - public static final long PEER_REFRESH_PERIOD = 60000L; private static final String CATEGORY = "Site to Site"; private static final Logger logger = LoggerFactory.getLogger(StandardRemoteGroupPort.class); private final RemoteProcessGroup remoteGroup; - private final SSLContext sslContext; private final AtomicBoolean useCompression = new AtomicBoolean(false); private final AtomicBoolean targetExists = new AtomicBoolean(true); private final AtomicBoolean targetRunning = new AtomicBoolean(true); - private final AtomicLong peerIndex = new AtomicLong(0L); - - private volatile List<PeerStatus> peerStatuses; - private volatile long peerRefreshTime = 0L; - private final ReentrantLock peerRefreshLock = new ReentrantLock(); + private final TransferDirection transferDirection; - private final ConcurrentMap<String, BlockingQueue<EndpointConnectionState>> endpointConnectionMap = new ConcurrentHashMap<>(); - private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>(); + private final EndpointConnectionStatePool connectionStatePool; private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>(); private final Lock interruptLock = new ReentrantLock(); @@ -113,8 +109,17 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup super(id, name, processGroup, type, scheduler); this.remoteGroup = remoteGroup; - this.sslContext = sslContext; + this.transferDirection = direction; setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos"); + + final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory(); + final File persistenceFile = new File(stateDir, remoteGroup.getIdentifier() + ".peers"); + + // TODO: This should really be constructed in the RemoteProcessGroup and made available to all ports in + // that remote process group. This prevents too many connections from being made and also protects the persistenceFile + // so that only a single thread will ever attempt to write to the file at once. + FIXME(); + connectionStatePool = new EndpointConnectionStatePool(sslContext, remoteGroup.getEventReporter(), persistenceFile); } @Override @@ -133,28 +138,15 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup @Override public void shutdown() { - super.shutdown(); - - peerTimeoutExpirations.clear(); + super.shutdown(); interruptLock.lock(); try { this.shutdown = true; - - for ( final CommunicationsSession commsSession : activeCommsChannels ) { - commsSession.interrupt(); - } - - for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) { - EndpointConnectionState state; - while ( (state = queue.poll()) != null) { - cleanup(state.getSocketClientProtocol(), state.getPeer()); - } - } - - endpointConnectionMap.clear(); } finally { interruptLock.unlock(); } + + connectionStatePool.shutdown(); } @Override @@ -171,31 +163,7 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup void cleanupSockets() { - final List<EndpointConnectionState> states = new ArrayList<>(); - - for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) { - states.clear(); - - EndpointConnectionState state; - while ((state = queue.poll()) != null) { - // If the socket has not been used in 10 seconds, shut it down. - final long lastUsed = state.getLastTimeUsed(); - if ( lastUsed < System.currentTimeMillis() - 10000L ) { - try { - state.getSocketClientProtocol().shutdown(state.getPeer()); - } catch (final Exception e) { - logger.debug("Failed to shut down {} using {} due to {}", - new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} ); - } - - cleanup(state.getSocketClientProtocol(), state.getPeer()); - } else { - states.add(state); - } - } - - queue.addAll(states); - } + connectionStatePool.cleanupExpiredSockets(); } @@ -212,137 +180,46 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup } String url = getRemoteProcessGroup().getTargetUri().toString(); - Peer peer = null; - final PeerStatus peerStatus = getNextPeerStatus(); - if ( peerStatus == null ) { - logger.debug("{} Unable to determine the next peer to communicate with; all peers must be penalized, so yielding context", this); + + final EndpointConnectionState connectionState; + try { + connectionState = connectionStatePool.getEndpointConnectionState(url, this, transferDirection); + } catch (final PortNotRunningException e) { context.yield(); + this.targetRunning.set(false); + final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", this, url); + logger.error(message); + remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); return; - } - - url = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort(); - - // - // Attempt to get a connection state that already exists for this URL. - // - BlockingQueue<EndpointConnectionState> connectionStateQueue = endpointConnectionMap.get(url); - if ( connectionStateQueue == null ) { - connectionStateQueue = new LinkedBlockingQueue<>(); - BlockingQueue<EndpointConnectionState> existingQueue = endpointConnectionMap.putIfAbsent(url, connectionStateQueue); - if ( existingQueue != null ) { - connectionStateQueue = existingQueue; + } catch (final UnknownPortException e) { + context.yield(); + this.targetExists.set(false); + final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", this, url); + logger.error(message); + remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); + return; + } catch (final ProtocolException | HandshakeException | IOException e) { + final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString()); + logger.error(message); + if ( logger.isDebugEnabled() ) { + logger.error("", e); } + remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); + session.rollback(); + return; } - FlowFileCodec codec = null; - CommunicationsSession commsSession = null; - SocketClientProtocol protocol = null; - EndpointConnectionState connectionState; + if ( connectionState == null ) { + logger.debug("{} Unable to determine the next peer to communicate with; all peers must be penalized, so yielding context", this); + context.yield(); + return; + } - do { - connectionState = connectionStateQueue.poll(); - logger.debug("{} Connection State for {} = {}", this, url, connectionState); - - // if we can't get an existing ConnectionState, create one - if ( connectionState == null ) { - protocol = new SocketClientProtocol(); - protocol.setPort(this); - - try { - commsSession = establishSiteToSiteConnection(peerStatus); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - try { - RemoteResourceFactory.initiateResourceNegotiation(protocol, dis, dos); - } catch (final HandshakeException e) { - try { - commsSession.close(); - } catch (final IOException ioe) { - final String message = String.format("%s unable to close communications session %s due to %s; resources may not be appropriately cleaned up", - this, commsSession, ioe.toString()); - logger.error(message); - remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); - } - } - } catch (final IOException e) { - final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? url : peer, e.toString()); - logger.error(message); - if ( logger.isDebugEnabled() ) { - logger.error("", e); - } - remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); - session.rollback(); - return; - } - - - peer = new Peer(commsSession, url); - - // perform handshake - try { - protocol.handshake(peer); - - // handle error cases - if ( protocol.isDestinationFull() ) { - logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer); - penalize(peer); - cleanup(protocol, peer); - return; - } else if ( protocol.isPortInvalid() ) { - penalize(peer); - context.yield(); - cleanup(protocol, peer); - this.targetRunning.set(false); - final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", this, peer); - logger.error(message); - remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); - return; - } else if ( protocol.isPortUnknown() ) { - penalize(peer); - context.yield(); - cleanup(protocol, peer); - this.targetExists.set(false); - final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", this, peer); - logger.error(message); - remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); - return; - } - - // negotiate the FlowFileCodec to use - codec = protocol.negotiateCodec(peer); - } catch (final Exception e) { - penalize(peer); - cleanup(protocol, peer); - - final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? url : peer, e.toString()); - logger.error(message); - if ( logger.isDebugEnabled() ) { - logger.error("", e); - } - remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message); - session.rollback(); - return; - } - - connectionState = new EndpointConnectionState(peer, protocol, codec); - } else { - final long lastTimeUsed = connectionState.getLastTimeUsed(); - final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed; - final long timeoutMillis = remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS); - - if ( timeoutMillis > 0L && millisSinceLastUse >= timeoutMillis ) { - cleanup(connectionState.getSocketClientProtocol(), connectionState.getPeer()); - connectionState = null; - } else { - codec = connectionState.getCodec(); - peer = connectionState.getPeer(); - commsSession = peer.getCommunicationsSession(); - protocol = connectionState.getSocketClientProtocol(); - } - } - } while ( connectionState == null || codec == null || commsSession == null || protocol == null ); + FlowFileCodec codec = connectionState.getCodec(); + SocketClientProtocol protocol = connectionState.getSocketClientProtocol(); + final Peer peer = connectionState.getPeer(); + url = peer.getUrl(); - try { interruptLock.lock(); try { @@ -361,11 +238,6 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup receiveFlowFiles(peer, protocol, context, session, codec); } - if ( peer.isPenalized() ) { - logger.debug("{} {} was penalized", this, peer); - penalize(peer); - } - interruptLock.lock(); try { if ( shutdown ) { @@ -380,12 +252,12 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup session.commit(); connectionState.setLastTimeUsed(); - connectionStateQueue.add(connectionState); + connectionStatePool.offer(connectionState); } catch (final TransmissionDisabledException e) { cleanup(protocol, peer); session.rollback(); } catch (final Exception e) { - penalize(peer); + connectionStatePool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS)); final String message = String.format("%s failed to communicate with %s (%s) due to %s", this, peer == null ? url : peer, protocol, e.toString()); logger.error(message); @@ -401,34 +273,6 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup } - /** - * Updates internal state map to penalize a PeerStatus that points to the specified peer - * @param peer - */ - private void penalize(final Peer peer) { - String host; - int port; - try { - final URI uri = new URI(peer.getUrl()); - host = uri.getHost(); - port = uri.getPort(); - } catch (final URISyntaxException e) { - host = peer.getHost(); - port = -1; - } - - final PeerStatus status = new PeerStatus(host, port, true, 1); - Long expiration = peerTimeoutExpirations.get(status); - if ( expiration == null ) { - expiration = Long.valueOf(0L); - } - - final long penalizationMillis = getYieldPeriod(TimeUnit.MILLISECONDS); - final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis); - peerTimeoutExpirations.put(status, Long.valueOf(newExpiration)); - } - - private void cleanup(final SocketClientProtocol protocol, final Peer peer) { if ( protocol != null && peer != null ) { try { @@ -457,108 +301,6 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup return remoteGroup.getYieldDuration(); } - public CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException { - final String destinationUri = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort(); - - CommunicationsSession commsSession = null; - try { - if ( peerStatus.isSecure() ) { - if ( sslContext == null ) { - throw new IOException("Unable to communicate with " + peerStatus.getHostname() + ":" + peerStatus.getPort() + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications"); - } - - final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, peerStatus.getHostname(), peerStatus.getPort(), true); - socketChannel.connect(); - - commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri); - - try { - commsSession.setUserDn(socketChannel.getDn()); - } catch (final CertificateNotYetValidException | CertificateExpiredException ex) { - throw new IOException(ex); - } - } else { - final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(peerStatus.getHostname(), peerStatus.getPort())); - commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri); - } - - commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES); - - commsSession.setUri(destinationUri); - } catch (final IOException ioe) { - if ( commsSession != null ) { - commsSession.close(); - } - - throw ioe; - } - - return commsSession; - } - - private PeerStatus getNextPeerStatus() { - List<PeerStatus> peerList = peerStatuses; - if ( (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) && peerRefreshLock.tryLock() ) { - try { - try { - peerList = createPeerStatusList(); - } catch (final IOException | BadRequestException | HandshakeException | UnknownPortException | PortNotRunningException | ClientHandlerException e) { - final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString()); - logger.warn(message); - if ( logger.isDebugEnabled() ) { - logger.warn("", e); - } - remoteGroup.getEventReporter().reportEvent(Severity.WARNING, CATEGORY, message); - } - - this.peerStatuses = peerList; - peerRefreshTime = System.currentTimeMillis(); - } finally { - peerRefreshLock.unlock(); - } - } - - if ( peerList == null || peerList.isEmpty() ) { - return null; - } - - PeerStatus peerStatus; - for (int i=0; i < peerList.size(); i++) { - final long idx = peerIndex.getAndIncrement(); - final int listIndex = (int) (idx % peerList.size()); - peerStatus = peerList.get(listIndex); - - if ( isPenalized(peerStatus) ) { - logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus); - } else { - return peerStatus; - } - } - - logger.debug("{} All peers appear to be penalized; returning null", this); - return null; - } - - private boolean isPenalized(final PeerStatus peerStatus) { - final Long expirationEnd = peerTimeoutExpirations.get(peerStatus); - return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis() ); - } - - private List<PeerStatus> createPeerStatusList() throws IOException, BadRequestException, HandshakeException, UnknownPortException, PortNotRunningException { - final Set<PeerStatus> statuses = remoteGroup.getPeerStatuses(); - if ( statuses == null ) { - return new ArrayList<>(); - } - - final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); - final List<NodeInformation> nodeInfos = new ArrayList<>(); - for ( final PeerStatus peerStatus : statuses ) { - final NodeInformation nodeInfo = new NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, peerStatus.isSecure(), peerStatus.getFlowFileCount()); - nodeInfos.add(nodeInfo); - } - clusterNodeInfo.setNodeInformation(nodeInfos); - return formulateDestinationList(clusterNodeInfo); - } private void transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { protocol.transferFlowFiles(peer, context, session, codec); @@ -568,70 +310,6 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup protocol.receiveFlowFiles(peer, context, session, codec); } - private List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo) throws IOException { - return formulateDestinationList(clusterNodeInfo, getConnectableType()); - } - - static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final ConnectableType connectableType) { - final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation(); - final int numDestinations = Math.max(128, nodeInfoSet.size()); - final Map<NodeInformation, Integer> entryCountMap = new HashMap<>(); - - long totalFlowFileCount = 0L; - for (final NodeInformation nodeInfo : nodeInfoSet) { - totalFlowFileCount += nodeInfo.getTotalFlowFiles(); - } - - int totalEntries = 0; - for (final NodeInformation nodeInfo : nodeInfoSet) { - final int flowFileCount = nodeInfo.getTotalFlowFiles(); - // don't allow any node to get more than 80% of the data - final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount)); - final double relativeWeighting = (connectableType == ConnectableType.REMOTE_INPUT_PORT) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles; - final int entries = Math.max(1, (int) (numDestinations * relativeWeighting)); - - entryCountMap.put(nodeInfo, Math.max(1, entries)); - totalEntries += entries; - } - - final List<PeerStatus> destinations = new ArrayList<>(totalEntries); - for (int i=0; i < totalEntries; i++) { - destinations.add(null); - } - for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) { - final NodeInformation nodeInfo = entry.getKey(); - final int numEntries = entry.getValue(); - - int skipIndex = numEntries; - for (int i=0; i < numEntries; i++) { - int n = (skipIndex * i); - while (true) { - final int index = n % destinations.size(); - PeerStatus status = destinations.get(index); - if ( status == null ) { - status = new PeerStatus(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), nodeInfo.getTotalFlowFiles()); - destinations.set(index, status); - break; - } else { - n++; - } - } - } - } - - final StringBuilder distributionDescription = new StringBuilder(); - distributionDescription.append("New Weighted Distribution of Nodes:"); - for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) { - final double percentage = entry.getValue() * 100D / (double) destinations.size(); - distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of FlowFiles"); - } - logger.info(distributionDescription.toString()); - - // Jumble the list of destinations. - return destinations; - } - - @Override public boolean getTargetExists() { return targetExists.get(); @@ -717,40 +395,6 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup } - private static class EndpointConnectionState { - private final Peer peer; - private final SocketClientProtocol socketClientProtocol; - private final FlowFileCodec codec; - private volatile long lastUsed; - - private EndpointConnectionState(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) { - this.peer = peer; - this.socketClientProtocol = socketClientProtocol; - this.codec = codec; - } - - public FlowFileCodec getCodec() { - return codec; - } - - public SocketClientProtocol getSocketClientProtocol() { - return socketClientProtocol; - } - - public Peer getPeer() { - return peer; - } - - public void setLastTimeUsed() { - lastUsed = System.currentTimeMillis(); - } - - public long getLastTimeUsed() { - return lastUsed; - } - } - - @Override public SchedulingStrategy getSchedulingStrategy() { return SchedulingStrategy.TIMER_DRIVEN; @@ -761,4 +405,28 @@ public class StandardRemoteGroupPort extends AbstractPort implements RemoteGroup public boolean isSideEffectFree() { return false; } + + @Override + public String getDescription() { + return toString(); + } + + @Override + public long getCommunicationsTimeout(final TimeUnit timeUnit) { + return getRemoteProcessGroup().getCommunicationsTimeout(timeUnit); + } + + @Override + public URI getTargetUri() { + return remoteGroup.getTargetUri(); + } + + @Override + public boolean isSecure() { + try { + return remoteGroup.isSecure(); + } catch (final CommunicationsException ce) { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java deleted file mode 100644 index d18a4ee..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.codec; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.remote.StandardVersionNegotiator; -import org.apache.nifi.remote.VersionNegotiator; -import org.apache.nifi.remote.exception.ProtocolException; - -public class StandardFlowFileCodec implements FlowFileCodec { - public static final int MAX_NUM_ATTRIBUTES = 25000; - - public static final String DEFAULT_FLOWFILE_PATH = "./"; - - private final VersionNegotiator versionNegotiator; - - public StandardFlowFileCodec() { - versionNegotiator = new StandardVersionNegotiator(1); - } - - @Override - public FlowFile encode(final FlowFile flowFile, final ProcessSession session, final OutputStream encodedOut) throws IOException { - final DataOutputStream out = new DataOutputStream(encodedOut); - - final Map<String, String> attributes = flowFile.getAttributes(); - out.writeInt(attributes.size()); - for ( final Map.Entry<String, String> entry : attributes.entrySet() ) { - writeString(entry.getKey(), out); - writeString(entry.getValue(), out); - } - - out.writeLong(flowFile.getSize()); - - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - final byte[] buffer = new byte[8192]; - int len; - while ( (len = in.read(buffer)) > 0 ) { - encodedOut.write(buffer, 0, len); - } - - encodedOut.flush(); - } - }); - - return flowFile; - } - - - @Override - public FlowFile decode(final InputStream stream, final ProcessSession session) throws IOException, ProtocolException { - final DataInputStream in = new DataInputStream(stream); - - final int numAttributes; - try { - numAttributes = in.readInt(); - } catch (final EOFException e) { - // we're out of data. - return null; - } - - // This is here because if the stream is not properly formed, we could get up to Integer.MAX_VALUE attributes, which will - // generally result in an OutOfMemoryError. - if ( numAttributes > MAX_NUM_ATTRIBUTES ) { - throw new ProtocolException("FlowFile exceeds maximum number of attributes with a total of " + numAttributes); - } - - try { - final Map<String, String> attributes = new HashMap<>(numAttributes); - for (int i=0; i < numAttributes; i++) { - final String attrName = readString(in); - final String attrValue = readString(in); - attributes.put(attrName, attrValue); - } - - final long numBytes = in.readLong(); - - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, attributes); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - int len; - long size = 0; - final byte[] buffer = new byte[8192]; - - while ( size < numBytes && (len = in.read(buffer, 0, (int) Math.min(buffer.length, numBytes - size))) > 0 ) { - out.write(buffer, 0, len); - size += len; - } - - if ( size != numBytes ) { - throw new EOFException("Expected " + numBytes + " bytes but received only " + size); - } - } - }); - - return flowFile; - } catch (final EOFException e) { - session.rollback(); - - // we throw the general IOException here because we did not expect to hit EOFException - throw e; - } - } - - private void writeString(final String val, final DataOutputStream out) throws IOException { - final byte[] bytes = val.getBytes("UTF-8"); - out.writeInt(bytes.length); - out.write(bytes); - } - - - private String readString(final DataInputStream in) throws IOException { - final int numBytes = in.readInt(); - final byte[] bytes = new byte[numBytes]; - StreamUtils.fillBuffer(in, bytes, true); - return new String(bytes, "UTF-8"); - } - - @Override - public List<Integer> getSupportedVersions() { - return versionNegotiator.getSupportedVersions(); - } - - @Override - public VersionNegotiator getVersionNegotiator() { - return versionNegotiator; - } - - @Override - public String toString() { - return "Standard FlowFile Codec, Version " + versionNegotiator.getVersion(); - } - - @Override - public String getResourceName() { - return "StandardFlowFileCodec"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java deleted file mode 100644 index 926809c..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -import org.apache.nifi.remote.codec.FlowFileCodec; - -public class UnsupportedCodecException extends RuntimeException { - private static final long serialVersionUID = 198234789237L; - - public UnsupportedCodecException(final String codecName) { - super("Codec " + codecName + " is not supported"); - } - - public UnsupportedCodecException(final FlowFileCodec codec, final int version) { - super("Codec " + codec.getClass().getName() + " does not support Version " + version); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java deleted file mode 100644 index 0822b6a..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket; - -import java.io.IOException; -import java.nio.channels.SocketChannel; - -import org.apache.nifi.remote.AbstractCommunicationsSession; - -public class SocketChannelCommunicationsSession extends AbstractCommunicationsSession { - private final SocketChannel channel; - private final SocketChannelInput request; - private final SocketChannelOutput response; - private int timeout = 30000; - - public SocketChannelCommunicationsSession(final SocketChannel socketChannel, final String uri) throws IOException { - super(uri); - request = new SocketChannelInput(socketChannel); - response = new SocketChannelOutput(socketChannel); - channel = socketChannel; - socketChannel.configureBlocking(false); - } - - @Override - public boolean isClosed() { - return !channel.isConnected(); - } - - @Override - public SocketChannelInput getInput() { - return request; - } - - @Override - public SocketChannelOutput getOutput() { - return response; - } - - @Override - public void setTimeout(final int millis) throws IOException { - request.setTimeout(millis); - response.setTimeout(millis); - this.timeout = millis; - } - - @Override - public int getTimeout() throws IOException { - return timeout; - } - - @Override - public void close() throws IOException { - channel.close(); - } - - @Override - public boolean isDataAvailable() { - return request.isDataAvailable(); - } - - @Override - public long getBytesWritten() { - return response.getBytesWritten(); - } - - @Override - public long getBytesRead() { - return request.getBytesRead(); - } - - @Override - public void interrupt() { - request.interrupt(); - response.interrupt(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java deleted file mode 100644 index 9e451fd..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.channels.SocketChannel; - -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.ByteCountingInputStream; -import org.apache.nifi.remote.io.InterruptableInputStream; -import org.apache.nifi.remote.protocol.CommunicationsInput; - -public class SocketChannelInput implements CommunicationsInput { - private final SocketChannelInputStream socketIn; - private final ByteCountingInputStream countingIn; - private final InputStream bufferedIn; - private final InterruptableInputStream interruptableIn; - - public SocketChannelInput(final SocketChannel socketChannel) throws IOException { - this.socketIn = new SocketChannelInputStream(socketChannel); - countingIn = new ByteCountingInputStream(socketIn); - bufferedIn = new BufferedInputStream(countingIn); - interruptableIn = new InterruptableInputStream(bufferedIn); - } - - @Override - public InputStream getInputStream() throws IOException { - return interruptableIn; - } - - public void setTimeout(final int millis) { - socketIn.setTimeout(millis); - } - - public boolean isDataAvailable() { - try { - return interruptableIn.available() > 0; - } catch (final Exception e) { - return false; - } - } - - @Override - public long getBytesRead() { - return countingIn.getBytesRead(); - } - - public void interrupt() { - interruptableIn.interrupt(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java deleted file mode 100644 index 26c0164..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.channels.SocketChannel; - -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.stream.io.ByteCountingOutputStream; -import org.apache.nifi.remote.io.InterruptableOutputStream; -import org.apache.nifi.remote.protocol.CommunicationsOutput; - -public class SocketChannelOutput implements CommunicationsOutput { - private final SocketChannelOutputStream socketOutStream; - private final ByteCountingOutputStream countingOut; - private final OutputStream bufferedOut; - private final InterruptableOutputStream interruptableOut; - - public SocketChannelOutput(final SocketChannel socketChannel) throws IOException { - socketOutStream = new SocketChannelOutputStream(socketChannel); - countingOut = new ByteCountingOutputStream(socketOutStream); - bufferedOut = new BufferedOutputStream(countingOut); - interruptableOut = new InterruptableOutputStream(bufferedOut); - } - - @Override - public OutputStream getOutputStream() throws IOException { - return interruptableOut; - } - - public void setTimeout(final int timeout) { - socketOutStream.setTimeout(timeout); - } - - @Override - public long getBytesWritten() { - return countingOut.getBytesWritten(); - } - - public void interrupt() { - interruptableOut.interrupt(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java deleted file mode 100644 index dca1d84..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket.ssl; - -import java.io.IOException; - -import org.apache.nifi.remote.AbstractCommunicationsSession; - -public class SSLSocketChannelCommunicationsSession extends AbstractCommunicationsSession { - private final SSLSocketChannel channel; - private final SSLSocketChannelInput request; - private final SSLSocketChannelOutput response; - - public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel, final String uri) { - super(uri); - request = new SSLSocketChannelInput(channel); - response = new SSLSocketChannelOutput(channel); - this.channel = channel; - } - - @Override - public SSLSocketChannelInput getInput() { - return request; - } - - @Override - public SSLSocketChannelOutput getOutput() { - return response; - } - - @Override - public void setTimeout(final int millis) throws IOException { - channel.setTimeout(millis); - } - - @Override - public int getTimeout() throws IOException { - return channel.getTimeout(); - } - - @Override - public void close() throws IOException { - channel.close(); - } - - @Override - public boolean isClosed() { - return channel.isClosed(); - } - - @Override - public boolean isDataAvailable() { - try { - return request.isDataAvailable(); - } catch (final Exception e) { - return false; - } - } - - @Override - public long getBytesWritten() { - return response.getBytesWritten(); - } - - @Override - public long getBytesRead() { - return request.getBytesRead(); - } - - @Override - public void interrupt() { - channel.interrupt(); - } - - @Override - public String toString() { - return super.toString() + "[SSLSocketChannel=" + channel + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java deleted file mode 100644 index 60ef33f..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket.ssl; - -import java.io.IOException; -import java.io.InputStream; - -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.ByteCountingInputStream; -import org.apache.nifi.remote.protocol.CommunicationsInput; - -public class SSLSocketChannelInput implements CommunicationsInput { - private final SSLSocketChannelInputStream in; - private final ByteCountingInputStream countingIn; - private final InputStream bufferedIn; - - public SSLSocketChannelInput(final SSLSocketChannel socketChannel) { - in = new SSLSocketChannelInputStream(socketChannel); - countingIn = new ByteCountingInputStream(in); - this.bufferedIn = new BufferedInputStream(countingIn); - } - - @Override - public InputStream getInputStream() throws IOException { - return bufferedIn; - } - - public boolean isDataAvailable() throws IOException { - return bufferedIn.available() > 0; - } - - @Override - public long getBytesRead() { - return countingIn.getBytesRead(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java deleted file mode 100644 index dc3d68f..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket.ssl; - -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.stream.io.ByteCountingOutputStream; -import org.apache.nifi.remote.protocol.CommunicationsOutput; - -public class SSLSocketChannelOutput implements CommunicationsOutput { - private final OutputStream out; - private final ByteCountingOutputStream countingOut; - - public SSLSocketChannelOutput(final SSLSocketChannel channel) { - countingOut = new ByteCountingOutputStream(new SSLSocketChannelOutputStream(channel)); - out = new BufferedOutputStream(countingOut); - } - - @Override - public OutputStream getOutputStream() throws IOException { - return out; - } - - @Override - public long getBytesWritten() { - return countingOut.getBytesWritten(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java index a526f4c..391d52b 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java @@ -21,9 +21,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.Collection; -import org.apache.nifi.cluster.ClusterNodeInformation; -import org.apache.nifi.cluster.NodeInformant; -import org.apache.nifi.cluster.NodeInformation; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -31,12 +28,14 @@ import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.StandardVersionNegotiator; import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformant; +import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.protocol.ServerProtocol; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java deleted file mode 100644 index c4519cd..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol.socket; - -public enum HandshakeProperty { - GZIP, - PORT_IDENTIFIER, - REQUEST_EXPIRATION_MILLIS; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java deleted file mode 100644 index eae1940..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol.socket; - -import java.io.DataInputStream; -import java.io.IOException; - -import org.apache.nifi.remote.exception.ProtocolException; - -public class Response { - private final ResponseCode code; - private final String message; - - private Response(final ResponseCode code, final String explanation) { - this.code = code; - this.message = explanation; - } - - public ResponseCode getCode() { - return code; - } - - public String getMessage() { - return message; - } - - public static Response read(final DataInputStream in) throws IOException, ProtocolException { - final ResponseCode code = ResponseCode.readCode(in); - final String message = code.containsMessage() ? in.readUTF() : null; - return new Response(code, message); - } - - @Override - public String toString() { - return code + ": " + message; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java deleted file mode 100644 index 0e588cd..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol.socket; - -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.nifi.remote.exception.ProtocolException; - - -public enum ResponseCode { - RESERVED(0, "Reserved for Future Use", false), // This will likely be used if we ever need to expand the length of - // ResponseCode, so that we can indicate a 0 followed by some other bytes - - // handshaking properties - PROPERTIES_OK(1, "Properties OK", false), - UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true), - ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true), - MISSING_PROPERTY(232, "Missing Property", true), - - // transaction indicators - CONTINUE_TRANSACTION(10, "Continue Transaction", false), - FINISH_TRANSACTION(11, "Finish Transaction", false), - CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum - TRANSACTION_FINISHED(13, "Transaction Finished", false), - TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false), - BAD_CHECKSUM(19, "Bad Checksum", false), - - // data availability indicators - MORE_DATA(20, "More Data Exists", false), - NO_MORE_DATA(21, "No More Data Exists", false), - - // port state indicators - UNKNOWN_PORT(200, "Unknown Port", false), - PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true), - PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false), - - // authorization - UNAUTHORIZED(240, "User Not Authorized", true), - - // error indicators - ABORT(250, "Abort", true), - UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false), - END_OF_STREAM(255, "End of Stream", false); - - private static final ResponseCode[] codeArray = new ResponseCode[256]; - - static { - for ( final ResponseCode responseCode : ResponseCode.values() ) { - codeArray[responseCode.getCode()] = responseCode; - } - } - - private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R'; - private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C'; - private final int code; - private final byte[] codeSequence; - private final String description; - private final boolean containsMessage; - - private ResponseCode(final int code, final String description, final boolean containsMessage) { - this.codeSequence = new byte[] {CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code}; - this.code = code; - this.description = description; - this.containsMessage = containsMessage; - } - - public int getCode() { - return code; - } - - public byte[] getCodeSequence() { - return codeSequence; - } - - @Override - public String toString() { - return description; - } - - public boolean containsMessage() { - return containsMessage; - } - - public void writeResponse(final DataOutputStream out) throws IOException { - if ( containsMessage() ) { - throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation"); - } - - out.write(getCodeSequence()); - out.flush(); - } - - public void writeResponse(final DataOutputStream out, final String explanation) throws IOException { - if ( !containsMessage() ) { - throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation"); - } - - out.write(getCodeSequence()); - out.writeUTF(explanation); - out.flush(); - } - - static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException { - final int byte1 = in.read(); - if ( byte1 < 0 ) { - throw new EOFException(); - } else if ( byte1 != CODE_SEQUENCE_VALUE_1 ) { - throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); - } - - final int byte2 = in.read(); - if ( byte2 < 0 ) { - throw new EOFException(); - } else if ( byte2 != CODE_SEQUENCE_VALUE_2 ) { - throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); - } - - final int byte3 = in.read(); - if ( byte3 < 0 ) { - throw new EOFException(); - } - - final ResponseCode responseCode = codeArray[byte3]; - if (responseCode == null) { - throw new ProtocolException("Received Response Code of " + byte3 + " but do not recognize this code"); - } - return responseCode; - } - - public static ResponseCode fromSequence(final byte[] value) { - final int code = value[3] & 0xFF; - final ResponseCode responseCode = codeArray[code]; - return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : responseCode; - } -} \ No newline at end of file