http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java new file mode 100644 index 0000000..8c23e28 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java @@ -0,0 +1,835 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Pattern; + +import javax.net.ssl.SSLContext; +import javax.security.cert.CertificateExpiredException; +import javax.security.cert.CertificateNotYetValidException; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.RemoteDestination; +import org.apache.nifi.remote.RemoteResourceInitiator; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformation; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.PortNotRunningException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.TransmissionDisabledException; +import org.apache.nifi.remote.exception.UnknownPortException; +import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; +import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; +import org.apache.nifi.remote.util.PeerStatusCache; +import org.apache.nifi.remote.util.RemoteNiFiUtils; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.dto.PortDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EndpointConnectionStatePool { + public static final long PEER_REFRESH_PERIOD = 60000L; + public static final String CATEGORY = "Site-to-Site"; + public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); + + private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); + + private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionStatePool.class); + + private final BlockingQueue<EndpointConnectionState> connectionStateQueue = new LinkedBlockingQueue<>(); + private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>(); + private final URI clusterUrl; + private final String apiUri; + + private final AtomicLong peerIndex = new AtomicLong(0L); + + private final ReentrantLock peerRefreshLock = new ReentrantLock(); + private volatile List<PeerStatus> peerStatuses; + private volatile long peerRefreshTime = 0L; + private volatile PeerStatusCache peerStatusCache; + private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>(); + + private final File peersFile; + private final EventReporter eventReporter; + private final SSLContext sslContext; + private final ScheduledExecutorService taskExecutor; + + private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock(); + private final Lock remoteInfoReadLock = listeningPortRWLock.readLock(); + private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock(); + private Integer siteToSitePort; + private Boolean siteToSiteSecure; + private long remoteRefreshTime; + private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier + private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier + + private volatile int commsTimeout; + + public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final EventReporter eventReporter, final File persistenceFile) { + this(clusterUrl, commsTimeoutMillis, null, eventReporter, persistenceFile); + } + + public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) { + try { + this.clusterUrl = new URI(clusterUrl); + } catch (final URISyntaxException e) { + throw new IllegalArgumentException("Invalid Cluster URL: " + clusterUrl); + } + + // Trim the trailing / + String uriPath = this.clusterUrl.getPath(); + if (uriPath.endsWith("/")) { + uriPath = uriPath.substring(0, uriPath.length() - 1); + } + apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api"; + + this.sslContext = sslContext; + this.peersFile = persistenceFile; + this.eventReporter = eventReporter; + this.commsTimeout = commsTimeoutMillis; + + Set<PeerStatus> recoveredStatuses; + if ( persistenceFile != null && persistenceFile.exists() ) { + try { + recoveredStatuses = recoverPersistedPeerStatuses(peersFile); + this.peerStatusCache = new PeerStatusCache(recoveredStatuses, peersFile.lastModified()); + } catch (final IOException ioe) { + logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe); + } + } else { + peerStatusCache = null; + } + + // Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused + // connections and keep our list of peers up-to-date. + taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { + private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(final Runnable r) { + final Thread thread = defaultFactory.newThread(r); + thread.setName("NiFi Site-to-Site Connection Pool Maintenance"); + return thread; + } + }); + + taskExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + refreshPeers(); + } + }, 0, 5, TimeUnit.SECONDS); + + taskExecutor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + cleanupExpiredSockets(); + } + }, 5, 5, TimeUnit.SECONDS); + } + + + public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { + return getEndpointConnectionState(remoteDestination, direction, null); + } + + + + public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { + // + // Attempt to get a connection state that already exists for this URL. + // + FlowFileCodec codec = null; + CommunicationsSession commsSession = null; + SocketClientProtocol protocol = null; + EndpointConnectionState connectionState; + Peer peer = null; + + final List<EndpointConnectionState> addBack = new ArrayList<>(); + try { + do { + final PeerStatus peerStatus = getNextPeerStatus(direction); + if ( peerStatus == null ) { + return null; + } + + connectionState = connectionStateQueue.poll(); + logger.debug("{} Connection State for {} = {}", this, clusterUrl, connectionState); + + if ( connectionState == null && !addBack.isEmpty() ) { + // all available connections have been penalized. + return null; + } + + if ( connectionState != null && connectionState.getPeer().isPenalized() ) { + // we have a connection, but it's penalized. We want to add it back to the queue + // when we've found one to use. + addBack.add(connectionState); + continue; + } + + // if we can't get an existing ConnectionState, create one + if ( connectionState == null ) { + protocol = new SocketClientProtocol(); + protocol.setDestination(remoteDestination); + + try { + commsSession = establishSiteToSiteConnection(peerStatus); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + try { + RemoteResourceInitiator.initiateResourceNegotiation(protocol, dis, dos); + } catch (final HandshakeException e) { + try { + commsSession.close(); + } catch (final IOException ioe) { + throw e; + } + } + } catch (final IOException e) { + } + + + final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort(); + peer = new Peer(commsSession, peerUrl, clusterUrl.toString()); + + // set properties based on config + if ( config != null ) { + protocol.setTimeout((int) config.getTimeout(TimeUnit.MILLISECONDS)); + protocol.setPreferredBatchCount(config.getPreferredBatchCount()); + protocol.setPreferredBatchSize(config.getPreferredBatchSize()); + protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS)); + } + + // perform handshake + try { + protocol.handshake(peer); + + // handle error cases + if ( protocol.isDestinationFull() ) { + logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer); + penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); + connectionStateQueue.offer(connectionState); + continue; + } else if ( protocol.isPortInvalid() ) { + penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); + cleanup(protocol, peer); + throw new PortNotRunningException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not running"); + } else if ( protocol.isPortUnknown() ) { + penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); + cleanup(protocol, peer); + throw new UnknownPortException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not known"); + } + + // negotiate the FlowFileCodec to use + codec = protocol.negotiateCodec(peer); + } catch (final PortNotRunningException | UnknownPortException e) { + throw e; + } catch (final Exception e) { + penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); + cleanup(protocol, peer); + + final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString()); + logger.error(message); + if ( logger.isDebugEnabled() ) { + logger.error("", e); + } + throw e; + } + + connectionState = new EndpointConnectionState(peer, protocol, codec); + } else { + final long lastTimeUsed = connectionState.getLastTimeUsed(); + final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed; + + if ( commsTimeout > 0L && millisSinceLastUse >= commsTimeout ) { + cleanup(connectionState.getSocketClientProtocol(), connectionState.getPeer()); + connectionState = null; + } else { + codec = connectionState.getCodec(); + peer = connectionState.getPeer(); + commsSession = peer.getCommunicationsSession(); + protocol = connectionState.getSocketClientProtocol(); + } + } + } while ( connectionState == null || codec == null || commsSession == null || protocol == null ); + } finally { + if ( !addBack.isEmpty() ) { + connectionStateQueue.addAll(addBack); + } + } + + return connectionState; + } + + + public boolean offer(final EndpointConnectionState endpointConnectionState) { + final Peer peer = endpointConnectionState.getPeer(); + if ( peer == null ) { + return false; + } + + final String url = peer.getUrl(); + if ( url == null ) { + return false; + } + + return connectionStateQueue.offer(endpointConnectionState); + } + + /** + * Updates internal state map to penalize a PeerStatus that points to the specified peer + * @param peer + */ + public void penalize(final Peer peer, final long penalizationMillis) { + String host; + int port; + try { + final URI uri = new URI(peer.getUrl()); + host = uri.getHost(); + port = uri.getPort(); + } catch (final URISyntaxException e) { + host = peer.getHost(); + port = -1; + } + + final PeerStatus status = new PeerStatus(host, port, true, 1); + Long expiration = peerTimeoutExpirations.get(status); + if ( expiration == null ) { + expiration = Long.valueOf(0L); + } + + final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis); + peerTimeoutExpirations.put(status, Long.valueOf(newExpiration)); + } + + private void cleanup(final SocketClientProtocol protocol, final Peer peer) { + if ( protocol != null && peer != null ) { + try { + protocol.shutdown(peer); + } catch (final TransmissionDisabledException e) { + // User disabled transmission.... do nothing. + logger.debug(this + " Transmission Disabled by User"); + } catch (IOException e1) { + } + } + + if ( peer != null ) { + try { + peer.close(); + } catch (final TransmissionDisabledException e) { + // User disabled transmission.... do nothing. + logger.debug(this + " Transmission Disabled by User"); + } catch (IOException e1) { + } + } + } + + private PeerStatus getNextPeerStatus(final TransferDirection direction) { + List<PeerStatus> peerList = peerStatuses; + if ( (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) && peerRefreshLock.tryLock() ) { + try { + try { + peerList = createPeerStatusList(direction); + } catch (final Exception e) { + final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString()); + logger.warn(message); + if ( logger.isDebugEnabled() ) { + logger.warn("", e); + } + + if ( eventReporter != null ) { + eventReporter.reportEvent(Severity.WARNING, CATEGORY, message); + } + } + + this.peerStatuses = peerList; + peerRefreshTime = System.currentTimeMillis(); + } finally { + peerRefreshLock.unlock(); + } + } + + if ( peerList == null || peerList.isEmpty() ) { + return null; + } + + PeerStatus peerStatus; + for (int i=0; i < peerList.size(); i++) { + final long idx = peerIndex.getAndIncrement(); + final int listIndex = (int) (idx % peerList.size()); + peerStatus = peerList.get(listIndex); + + if ( isPenalized(peerStatus) ) { + logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus); + } else { + return peerStatus; + } + } + + logger.debug("{} All peers appear to be penalized; returning null", this); + return null; + } + + private boolean isPenalized(final PeerStatus peerStatus) { + final Long expirationEnd = peerTimeoutExpirations.get(peerStatus); + return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis() ); + } + + private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException { + final Set<PeerStatus> statuses = getPeerStatuses(); + if ( statuses == null ) { + return new ArrayList<>(); + } + + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> nodeInfos = new ArrayList<>(); + for ( final PeerStatus peerStatus : statuses ) { + final NodeInformation nodeInfo = new NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, peerStatus.isSecure(), peerStatus.getFlowFileCount()); + nodeInfos.add(nodeInfo); + } + clusterNodeInfo.setNodeInformation(nodeInfos); + return formulateDestinationList(clusterNodeInfo, direction); + } + + + private Set<PeerStatus> getPeerStatuses() { + final PeerStatusCache cache = this.peerStatusCache; + if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) { + return null; + } + + if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) { + final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size()); + for (final PeerStatus status : cache.getStatuses()) { + final PeerStatus equalizedStatus = new PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1); + equalizedSet.add(equalizedStatus); + } + + return equalizedSet; + } + + return cache.getStatuses(); + } + + private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException { + final String hostname = clusterUrl.getHost(); + final int port = getSiteToSitePort(); + + final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port); + final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString()); + final SocketClientProtocol clientProtocol = new SocketClientProtocol(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, dis, dos); + + clientProtocol.setTimeout(commsTimeout); + clientProtocol.handshake(peer, null); + final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer); + persistPeerStatuses(peerStatuses); + + try { + clientProtocol.shutdown(peer); + } catch (final IOException e) { + final String message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString()); + logger.warn(message); + if (logger.isDebugEnabled()) { + logger.warn("", e); + } + } + + try { + peer.close(); + } catch (final IOException e) { + final String message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString()); + logger.warn(message); + if (logger.isDebugEnabled()) { + logger.warn("", e); + } + } + + return peerStatuses; + } + + + private void persistPeerStatuses(final Set<PeerStatus> statuses) { + if ( peersFile == null ) { + return; + } + + try (final OutputStream fos = new FileOutputStream(peersFile); + final OutputStream out = new BufferedOutputStream(fos)) { + + for (final PeerStatus status : statuses) { + final String line = status.getHostname() + ":" + status.getPort() + ":" + status.isSecure() + "\n"; + out.write(line.getBytes(StandardCharsets.UTF_8)); + } + + } catch (final IOException e) { + logger.error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString(), e); + } + } + + private Set<PeerStatus> recoverPersistedPeerStatuses(final File file) throws IOException { + if (!file.exists()) { + return null; + } + + final Set<PeerStatus> statuses = new HashSet<>(); + try (final InputStream fis = new FileInputStream(file); + final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) { + + String line; + while ((line = reader.readLine()) != null) { + final String[] splits = line.split(Pattern.quote(":")); + if (splits.length != 3) { + continue; + } + + final String hostname = splits[0]; + final int port = Integer.parseInt(splits[1]); + final boolean secure = Boolean.parseBoolean(splits[2]); + + statuses.add(new PeerStatus(hostname, port, secure, 1)); + } + } + + return statuses; + } + + + private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException { + return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort()); + } + + private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException { + if ( siteToSiteSecure == null ) { + throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections"); + } + + final String destinationUri = "nifi://" + hostname + ":" + port; + + CommunicationsSession commsSession = null; + try { + if ( siteToSiteSecure ) { + if ( sslContext == null ) { + throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications"); + } + + final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true); + socketChannel.connect(); + + commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri); + + try { + commsSession.setUserDn(socketChannel.getDn()); + } catch (final CertificateNotYetValidException | CertificateExpiredException ex) { + throw new IOException(ex); + } + } else { + final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); + commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri); + } + + commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES); + commsSession.setUri(destinationUri); + } catch (final IOException ioe) { + if ( commsSession != null ) { + commsSession.close(); + } + + throw ioe; + } + + return commsSession; + } + + + static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) { + final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation(); + final int numDestinations = Math.max(128, nodeInfoSet.size()); + final Map<NodeInformation, Integer> entryCountMap = new HashMap<>(); + + long totalFlowFileCount = 0L; + for (final NodeInformation nodeInfo : nodeInfoSet) { + totalFlowFileCount += nodeInfo.getTotalFlowFiles(); + } + + int totalEntries = 0; + for (final NodeInformation nodeInfo : nodeInfoSet) { + final int flowFileCount = nodeInfo.getTotalFlowFiles(); + // don't allow any node to get more than 80% of the data + final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount)); + final double relativeWeighting = (direction == TransferDirection.RECEIVE) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles; + final int entries = Math.max(1, (int) (numDestinations * relativeWeighting)); + + entryCountMap.put(nodeInfo, Math.max(1, entries)); + totalEntries += entries; + } + + final List<PeerStatus> destinations = new ArrayList<>(totalEntries); + for (int i=0; i < totalEntries; i++) { + destinations.add(null); + } + for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) { + final NodeInformation nodeInfo = entry.getKey(); + final int numEntries = entry.getValue(); + + int skipIndex = numEntries; + for (int i=0; i < numEntries; i++) { + int n = (skipIndex * i); + while (true) { + final int index = n % destinations.size(); + PeerStatus status = destinations.get(index); + if ( status == null ) { + status = new PeerStatus(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), nodeInfo.getTotalFlowFiles()); + destinations.set(index, status); + break; + } else { + n++; + } + } + } + } + + final StringBuilder distributionDescription = new StringBuilder(); + distributionDescription.append("New Weighted Distribution of Nodes:"); + for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) { + final double percentage = entry.getValue() * 100D / (double) destinations.size(); + distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of FlowFiles"); + } + logger.info(distributionDescription.toString()); + + // Jumble the list of destinations. + return destinations; + } + + + private void cleanupExpiredSockets() { + final List<EndpointConnectionState> states = new ArrayList<>(); + + EndpointConnectionState state; + while ((state = connectionStateQueue.poll()) != null) { + // If the socket has not been used in 10 seconds, shut it down. + final long lastUsed = state.getLastTimeUsed(); + if ( lastUsed < System.currentTimeMillis() - 10000L ) { + try { + state.getSocketClientProtocol().shutdown(state.getPeer()); + } catch (final Exception e) { + logger.debug("Failed to shut down {} using {} due to {}", + new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} ); + } + + cleanup(state.getSocketClientProtocol(), state.getPeer()); + } else { + states.add(state); + } + } + + connectionStateQueue.addAll(states); + } + + public void shutdown() { + taskExecutor.shutdown(); + peerTimeoutExpirations.clear(); + + for ( final CommunicationsSession commsSession : activeCommsChannels ) { + commsSession.interrupt(); + } + + EndpointConnectionState state; + while ( (state = connectionStateQueue.poll()) != null) { + cleanup(state.getSocketClientProtocol(), state.getPeer()); + } + } + + public void terminate(final EndpointConnectionState state) { + cleanup(state.getSocketClientProtocol(), state.getPeer()); + } + + private void refreshPeers() { + final PeerStatusCache existingCache = peerStatusCache; + if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) { + return; + } + + try { + final Set<PeerStatus> statuses = fetchRemotePeerStatuses(); + peerStatusCache = new PeerStatusCache(statuses); + logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size()); + } catch (Exception e) { + logger.warn("{} Unable to refresh Remote Group's peers due to {}", this, e); + if (logger.isDebugEnabled()) { + logger.warn("", e); + } + } + } + + + public String getInputPortIdentifier(final String portName) throws IOException { + return getPortIdentifier(portName, inputPortMap); + } + + public String getOutputPortIdentifier(final String portName) throws IOException { + return getPortIdentifier(portName, outputPortMap); + } + + + private String getPortIdentifier(final String portName, final Map<String, String> portMap) throws IOException { + String identifier; + remoteInfoReadLock.lock(); + try { + identifier = portMap.get(portName); + } finally { + remoteInfoReadLock.unlock(); + } + + if ( identifier != null ) { + return identifier; + } + + refreshRemoteInfo(); + + remoteInfoReadLock.lock(); + try { + return portMap.get(portName); + } finally { + remoteInfoReadLock.unlock(); + } + } + + + private ControllerDTO refreshRemoteInfo() throws IOException { + final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https"); + final RemoteNiFiUtils utils = new RemoteNiFiUtils(webInterfaceSecure ? sslContext : null); + final ControllerDTO controller = utils.getController(URI.create(apiUri + "/controller"), commsTimeout); + + remoteInfoWriteLock.lock(); + try { + this.siteToSitePort = controller.getRemoteSiteListeningPort(); + this.siteToSiteSecure = controller.isSiteToSiteSecure(); + + inputPortMap.clear(); + for (final PortDTO inputPort : controller.getInputPorts()) { + inputPortMap.put(inputPort.getName(), inputPort.getId()); + } + + outputPortMap.clear(); + for ( final PortDTO outputPort : controller.getOutputPorts()) { + outputPortMap.put(outputPort.getName(), outputPort.getId()); + } + + this.remoteRefreshTime = System.currentTimeMillis(); + } finally { + remoteInfoWriteLock.unlock(); + } + + return controller; + } + + /** + * @return the port that the remote instance is listening on for + * site-to-site communication, or <code>null</code> if the remote instance + * is not configured to allow site-to-site communications. + * + * @throws IOException if unable to communicate with the remote instance + */ + private Integer getSiteToSitePort() throws IOException { + Integer listeningPort; + remoteInfoReadLock.lock(); + try { + listeningPort = this.siteToSitePort; + if (listeningPort != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) { + return listeningPort; + } + } finally { + remoteInfoReadLock.unlock(); + } + + final ControllerDTO controller = refreshRemoteInfo(); + listeningPort = controller.getRemoteSiteListeningPort(); + + return listeningPort; + } + + /** + * Returns {@code true} if the remote instance is configured for secure site-to-site communications, + * {@code false} otherwise. + * + * @return + * @throws IOException + */ + public boolean isSecure() throws IOException { + remoteInfoReadLock.lock(); + try { + final Boolean secure = this.siteToSiteSecure; + if (secure != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) { + return secure; + } + } finally { + remoteInfoReadLock.unlock(); + } + + final ControllerDTO controller = refreshRemoteInfo(); + return controller.isSiteToSiteSecure(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java new file mode 100644 index 0000000..0494d04 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.remote.RemoteDestination; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.PortNotRunningException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.UnknownPortException; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.util.ObjectHolder; + +public class SocketClient implements SiteToSiteClient { + private final SiteToSiteClientConfig config; + private final EndpointConnectionStatePool pool; + private final boolean compress; + private final String portName; + private final long penalizationNanos; + private volatile String portIdentifier; + + public SocketClient(final SiteToSiteClientConfig config) { + pool = new EndpointConnectionStatePool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS), + config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile()); + + this.config = config; + this.compress = config.isUseCompression(); + this.portIdentifier = config.getPortIdentifier(); + this.portName = config.getPortName(); + this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS); + } + + @Override + public SiteToSiteClientConfig getConfig() { + return config; + } + + @Override + public boolean isSecure() throws IOException { + return pool.isSecure(); + } + + private String getPortIdentifier(final TransferDirection direction) throws IOException { + final String id = this.portIdentifier; + if ( id != null ) { + return id; + } + + if ( direction == TransferDirection.SEND ) { + return pool.getInputPortIdentifier(this.portName); + } else { + return pool.getOutputPortIdentifier(this.portName); + } + } + + + @Override + public Transaction createTransaction(final TransferDirection direction) throws IOException { + final String portId = getPortIdentifier(TransferDirection.SEND); + + if ( portId == null ) { + throw new IOException("Could not find Port with name " + portName + " for remote NiFi instance"); + } + + final RemoteDestination remoteDestination = new RemoteDestination() { + @Override + public String getIdentifier() { + return portId; + } + + @Override + public long getYieldPeriod(final TimeUnit timeUnit) { + return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS); + } + + @Override + public boolean isUseCompression() { + return compress; + } + }; + + final EndpointConnectionState connectionState; + try { + connectionState = pool.getEndpointConnectionState(remoteDestination, direction); + } catch (final ProtocolException | HandshakeException | PortNotRunningException | UnknownPortException e) { + throw new IOException(e); + } + + final Transaction transaction = connectionState.getSocketClientProtocol().startTransaction( + connectionState.getPeer(), connectionState.getCodec(), direction); + + // Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever + // the transaction is either completed or canceled. + final ObjectHolder<EndpointConnectionState> connectionStateRef = new ObjectHolder<>(connectionState); + return new Transaction() { + @Override + public void confirm() throws IOException { + transaction.confirm(); + } + + @Override + public void complete(final boolean requestBackoff) throws IOException { + try { + transaction.complete(requestBackoff); + } finally { + final EndpointConnectionState state = connectionStateRef.get(); + if ( state != null ) { + pool.offer(connectionState); + connectionStateRef.set(null); + } + } + } + + @Override + public void cancel(final String explanation) throws IOException { + try { + transaction.cancel(explanation); + } finally { + final EndpointConnectionState state = connectionStateRef.get(); + if ( state != null ) { + pool.terminate(connectionState); + connectionStateRef.set(null); + } + } + } + + @Override + public void error() { + try { + transaction.error(); + } finally { + final EndpointConnectionState state = connectionStateRef.get(); + if ( state != null ) { + pool.terminate(connectionState); + connectionStateRef.set(null); + } + } + } + + @Override + public void send(final DataPacket dataPacket) throws IOException { + transaction.send(dataPacket); + } + + @Override + public DataPacket receive() throws IOException { + return transaction.receive(); + } + + @Override + public TransactionState getState() throws IOException { + return transaction.getState(); + } + + }; + } + + + @Override + public void close() throws IOException { + pool.shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java new file mode 100644 index 0000000..6ca5812 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.cluster; + +public class AdaptedNodeInformation { + + private String hostname; + private Integer siteToSitePort; + private int apiPort; + private boolean isSiteToSiteSecure; + private int totalFlowFiles; + + public String getHostname() { + return hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + public Integer getSiteToSitePort() { + return siteToSitePort; + } + + public void setSiteToSitePort(Integer siteToSitePort) { + this.siteToSitePort = siteToSitePort; + } + + public int getApiPort() { + return apiPort; + } + + public void setApiPort(int apiPort) { + this.apiPort = apiPort; + } + + public boolean isSiteToSiteSecure() { + return isSiteToSiteSecure; + } + + public void setSiteToSiteSecure(boolean isSiteToSiteSecure) { + this.isSiteToSiteSecure = isSiteToSiteSecure; + } + + public int getTotalFlowFiles() { + return totalFlowFiles; + } + + public void setTotalFlowFiles(int totalFlowFiles) { + this.totalFlowFiles = totalFlowFiles; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/ClusterNodeInformation.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/ClusterNodeInformation.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/ClusterNodeInformation.java new file mode 100644 index 0000000..1bc83b9 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/ClusterNodeInformation.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.cluster; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collection; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +@XmlRootElement +public class ClusterNodeInformation { + + private Collection<NodeInformation> nodeInfo; + + private static final JAXBContext JAXB_CONTEXT; + + static { + try { + JAXB_CONTEXT = JAXBContext.newInstance(ClusterNodeInformation.class); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXBContext.", e); + } + } + + public ClusterNodeInformation() { + this.nodeInfo = null; + } + + public void setNodeInformation(final Collection<NodeInformation> nodeInfo) { + this.nodeInfo = nodeInfo; + } + + @XmlJavaTypeAdapter(NodeInformationAdapter.class) + public Collection<NodeInformation> getNodeInformation() { + return nodeInfo; + } + + public void marshal(final OutputStream os) throws JAXBException { + final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); + marshaller.marshal(this, os); + } + + public static ClusterNodeInformation unmarshal(final InputStream is) throws JAXBException { + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); + return (ClusterNodeInformation) unmarshaller.unmarshal(is); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java new file mode 100644 index 0000000..2041268 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.cluster; + +public class NodeInformation { + + private final String hostname; + private final Integer siteToSitePort; + private final int apiPort; + private final boolean isSiteToSiteSecure; + private final int totalFlowFiles; + + public NodeInformation(final String hostname, final Integer siteToSitePort, final int apiPort, + final boolean isSiteToSiteSecure, final int totalFlowFiles) { + this.hostname = hostname; + this.siteToSitePort = siteToSitePort; + this.apiPort = apiPort; + this.isSiteToSiteSecure = isSiteToSiteSecure; + this.totalFlowFiles = totalFlowFiles; + } + + public String getHostname() { + return hostname; + } + + public int getAPIPort() { + return apiPort; + } + + public Integer getSiteToSitePort() { + return siteToSitePort; + } + + public boolean isSiteToSiteSecure() { + return isSiteToSiteSecure; + } + + public int getTotalFlowFiles() { + return totalFlowFiles; + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof NodeInformation)) { + return false; + } + + final NodeInformation other = (NodeInformation) obj; + if (!hostname.equals(other.hostname)) { + return false; + } + if (siteToSitePort == null && other.siteToSitePort != null) { + return false; + } + if (siteToSitePort != null && other.siteToSitePort == null) { + return false; + } else if (siteToSitePort != null && siteToSitePort.intValue() != other.siteToSitePort.intValue()) { + return false; + } + if (apiPort != other.apiPort) { + return false; + } + if (isSiteToSiteSecure != other.isSiteToSiteSecure) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return 83832 + hostname.hashCode() + (siteToSitePort == null ? 8 : siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0); + } + + @Override + public String toString() { + return "Node[" + hostname + ":" + apiPort + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java new file mode 100644 index 0000000..440463c --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.cluster; + +import javax.xml.bind.annotation.adapters.XmlAdapter; + +import org.apache.nifi.remote.cluster.NodeInformation; + +public class NodeInformationAdapter extends XmlAdapter<AdaptedNodeInformation, NodeInformation> { + + @Override + public NodeInformation unmarshal(final AdaptedNodeInformation adapted) throws Exception { + return new NodeInformation(adapted.getHostname(), adapted.getSiteToSitePort(), adapted.getApiPort(), adapted.isSiteToSiteSecure(), adapted.getTotalFlowFiles()); + } + + @Override + public AdaptedNodeInformation marshal(final NodeInformation nodeInformation) throws Exception { + final AdaptedNodeInformation adapted = new AdaptedNodeInformation(); + adapted.setHostname(nodeInformation.getHostname()); + adapted.setSiteToSitePort(nodeInformation.getSiteToSitePort()); + adapted.setApiPort(nodeInformation.getAPIPort()); + adapted.setSiteToSiteSecure(nodeInformation.isSiteToSiteSecure()); + adapted.setTotalFlowFiles(nodeInformation.getTotalFlowFiles()); + return adapted; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java new file mode 100644 index 0000000..1380e1b --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.codec; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +import org.apache.nifi.remote.VersionedRemoteResource; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.TransmissionDisabledException; +import org.apache.nifi.remote.protocol.DataPacket; + +/** + * <p> + * Provides a mechanism for encoding and decoding FlowFiles as streams so that + * they can be transferred remotely. + * </p> + */ +public interface FlowFileCodec extends VersionedRemoteResource { + + /** + * Returns a List of all versions that this codec is able to support, in the + * order that they are preferred by the codec + * + * @return + */ + public List<Integer> getSupportedVersions(); + + /** + * Encodes a DataPacket and its content as a single stream of data and writes + * that stream to the output. + * + * @param dataPacket the data to serialize + * @param outStream the stream to write the data to + * + * @throws IOException if there is a communications issue + * @throws TransmissionDisabledException if a user terminates the connection + */ + void encode(DataPacket dataPacket, OutputStream outStream) throws IOException, TransmissionDisabledException; + + /** + * Decodes the contents of the InputStream, interpreting the data to + * determine the next DataPacket's attributes and content. + * + * @param stream an InputStream containing DataPacket's content and attributes + * + * @return the DataPacket that was created, or <code>null</code> if the stream + * was out of data + * + * @throws IOException + * @throws ProtocolException if the input is malformed + * @throws TransmissionDisabledException if a user terminates the connection + */ + DataPacket decode(InputStream stream) throws IOException, ProtocolException, TransmissionDisabledException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java new file mode 100644 index 0000000..6fd92de --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.codec; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.util.StandardDataPacket; +import org.apache.nifi.stream.io.StreamUtils; + +public class StandardFlowFileCodec implements FlowFileCodec { + public static final int MAX_NUM_ATTRIBUTES = 25000; + + public static final String DEFAULT_FLOWFILE_PATH = "./"; + + private final VersionNegotiator versionNegotiator; + + public StandardFlowFileCodec() { + versionNegotiator = new StandardVersionNegotiator(1); + } + + @Override + public void encode(final DataPacket dataPacket, final OutputStream encodedOut) throws IOException { + final DataOutputStream out = new DataOutputStream(encodedOut); + + final Map<String, String> attributes = dataPacket.getAttributes(); + out.writeInt(attributes.size()); + for ( final Map.Entry<String, String> entry : attributes.entrySet() ) { + writeString(entry.getKey(), out); + writeString(entry.getValue(), out); + } + + out.writeLong(dataPacket.getSize()); + + final InputStream in = dataPacket.getData(); + StreamUtils.copy(in, encodedOut); + encodedOut.flush(); + } + + + @Override + public DataPacket decode(final InputStream stream) throws IOException, ProtocolException { + final DataInputStream in = new DataInputStream(stream); + + final int numAttributes; + try { + numAttributes = in.readInt(); + } catch (final EOFException e) { + // we're out of data. + return null; + } + + // This is here because if the stream is not properly formed, we could get up to Integer.MAX_VALUE attributes, which will + // generally result in an OutOfMemoryError. + if ( numAttributes > MAX_NUM_ATTRIBUTES ) { + throw new ProtocolException("FlowFile exceeds maximum number of attributes with a total of " + numAttributes); + } + + final Map<String, String> attributes = new HashMap<>(numAttributes); + for (int i=0; i < numAttributes; i++) { + final String attrName = readString(in); + final String attrValue = readString(in); + attributes.put(attrName, attrValue); + } + + final long numBytes = in.readLong(); + + return new StandardDataPacket(attributes, stream, numBytes); + } + + private void writeString(final String val, final DataOutputStream out) throws IOException { + final byte[] bytes = val.getBytes("UTF-8"); + out.writeInt(bytes.length); + out.write(bytes); + } + + + private String readString(final DataInputStream in) throws IOException { + final int numBytes = in.readInt(); + final byte[] bytes = new byte[numBytes]; + StreamUtils.fillBuffer(in, bytes, true); + return new String(bytes, "UTF-8"); + } + + @Override + public List<Integer> getSupportedVersions() { + return versionNegotiator.getSupportedVersions(); + } + + @Override + public VersionNegotiator getVersionNegotiator() { + return versionNegotiator; + } + + @Override + public String toString() { + return "Standard FlowFile Codec, Version " + versionNegotiator.getVersion(); + } + + @Override + public String getResourceName() { + return "StandardFlowFileCodec"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java new file mode 100644 index 0000000..b61fc65 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class HandshakeException extends Exception { + + private static final long serialVersionUID = 178192341908726L; + + public HandshakeException(final String message) { + super(message); + } + + public HandshakeException(final Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java new file mode 100644 index 0000000..af0f467 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class PortNotRunningException extends Exception { + + private static final long serialVersionUID = -2790940982005516375L; + + public PortNotRunningException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java new file mode 100644 index 0000000..e12348a --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +import java.io.IOException; + +public class ProtocolException extends IOException { + + private static final long serialVersionUID = 5763900324505818495L; + + public ProtocolException(final String message, final Throwable cause) { + super(message, cause); + } + + public ProtocolException(final String message) { + super(message); + } + + public ProtocolException(final Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java new file mode 100644 index 0000000..e6a0fe7 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class UnknownPortException extends Exception { + + private static final long serialVersionUID = -2790940982005516375L; + + public UnknownPortException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java new file mode 100644 index 0000000..0822b6a --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket; + +import java.io.IOException; +import java.nio.channels.SocketChannel; + +import org.apache.nifi.remote.AbstractCommunicationsSession; + +public class SocketChannelCommunicationsSession extends AbstractCommunicationsSession { + private final SocketChannel channel; + private final SocketChannelInput request; + private final SocketChannelOutput response; + private int timeout = 30000; + + public SocketChannelCommunicationsSession(final SocketChannel socketChannel, final String uri) throws IOException { + super(uri); + request = new SocketChannelInput(socketChannel); + response = new SocketChannelOutput(socketChannel); + channel = socketChannel; + socketChannel.configureBlocking(false); + } + + @Override + public boolean isClosed() { + return !channel.isConnected(); + } + + @Override + public SocketChannelInput getInput() { + return request; + } + + @Override + public SocketChannelOutput getOutput() { + return response; + } + + @Override + public void setTimeout(final int millis) throws IOException { + request.setTimeout(millis); + response.setTimeout(millis); + this.timeout = millis; + } + + @Override + public int getTimeout() throws IOException { + return timeout; + } + + @Override + public void close() throws IOException { + channel.close(); + } + + @Override + public boolean isDataAvailable() { + return request.isDataAvailable(); + } + + @Override + public long getBytesWritten() { + return response.getBytesWritten(); + } + + @Override + public long getBytesRead() { + return request.getBytesRead(); + } + + @Override + public void interrupt() { + request.interrupt(); + response.interrupt(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java new file mode 100644 index 0000000..9e451fd --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.SocketChannel; + +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.remote.io.InterruptableInputStream; +import org.apache.nifi.remote.protocol.CommunicationsInput; + +public class SocketChannelInput implements CommunicationsInput { + private final SocketChannelInputStream socketIn; + private final ByteCountingInputStream countingIn; + private final InputStream bufferedIn; + private final InterruptableInputStream interruptableIn; + + public SocketChannelInput(final SocketChannel socketChannel) throws IOException { + this.socketIn = new SocketChannelInputStream(socketChannel); + countingIn = new ByteCountingInputStream(socketIn); + bufferedIn = new BufferedInputStream(countingIn); + interruptableIn = new InterruptableInputStream(bufferedIn); + } + + @Override + public InputStream getInputStream() throws IOException { + return interruptableIn; + } + + public void setTimeout(final int millis) { + socketIn.setTimeout(millis); + } + + public boolean isDataAvailable() { + try { + return interruptableIn.available() > 0; + } catch (final Exception e) { + return false; + } + } + + @Override + public long getBytesRead() { + return countingIn.getBytesRead(); + } + + public void interrupt() { + interruptableIn.interrupt(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java new file mode 100644 index 0000000..26c0164 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.SocketChannel; + +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.ByteCountingOutputStream; +import org.apache.nifi.remote.io.InterruptableOutputStream; +import org.apache.nifi.remote.protocol.CommunicationsOutput; + +public class SocketChannelOutput implements CommunicationsOutput { + private final SocketChannelOutputStream socketOutStream; + private final ByteCountingOutputStream countingOut; + private final OutputStream bufferedOut; + private final InterruptableOutputStream interruptableOut; + + public SocketChannelOutput(final SocketChannel socketChannel) throws IOException { + socketOutStream = new SocketChannelOutputStream(socketChannel); + countingOut = new ByteCountingOutputStream(socketOutStream); + bufferedOut = new BufferedOutputStream(countingOut); + interruptableOut = new InterruptableOutputStream(bufferedOut); + } + + @Override + public OutputStream getOutputStream() throws IOException { + return interruptableOut; + } + + public void setTimeout(final int timeout) { + socketOutStream.setTimeout(timeout); + } + + @Override + public long getBytesWritten() { + return countingOut.getBytesWritten(); + } + + public void interrupt() { + interruptableOut.interrupt(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java new file mode 100644 index 0000000..dca1d84 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket.ssl; + +import java.io.IOException; + +import org.apache.nifi.remote.AbstractCommunicationsSession; + +public class SSLSocketChannelCommunicationsSession extends AbstractCommunicationsSession { + private final SSLSocketChannel channel; + private final SSLSocketChannelInput request; + private final SSLSocketChannelOutput response; + + public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel, final String uri) { + super(uri); + request = new SSLSocketChannelInput(channel); + response = new SSLSocketChannelOutput(channel); + this.channel = channel; + } + + @Override + public SSLSocketChannelInput getInput() { + return request; + } + + @Override + public SSLSocketChannelOutput getOutput() { + return response; + } + + @Override + public void setTimeout(final int millis) throws IOException { + channel.setTimeout(millis); + } + + @Override + public int getTimeout() throws IOException { + return channel.getTimeout(); + } + + @Override + public void close() throws IOException { + channel.close(); + } + + @Override + public boolean isClosed() { + return channel.isClosed(); + } + + @Override + public boolean isDataAvailable() { + try { + return request.isDataAvailable(); + } catch (final Exception e) { + return false; + } + } + + @Override + public long getBytesWritten() { + return response.getBytesWritten(); + } + + @Override + public long getBytesRead() { + return request.getBytesRead(); + } + + @Override + public void interrupt() { + channel.interrupt(); + } + + @Override + public String toString() { + return super.toString() + "[SSLSocketChannel=" + channel + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java new file mode 100644 index 0000000..60ef33f --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket.ssl; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.remote.protocol.CommunicationsInput; + +public class SSLSocketChannelInput implements CommunicationsInput { + private final SSLSocketChannelInputStream in; + private final ByteCountingInputStream countingIn; + private final InputStream bufferedIn; + + public SSLSocketChannelInput(final SSLSocketChannel socketChannel) { + in = new SSLSocketChannelInputStream(socketChannel); + countingIn = new ByteCountingInputStream(in); + this.bufferedIn = new BufferedInputStream(countingIn); + } + + @Override + public InputStream getInputStream() throws IOException { + return bufferedIn; + } + + public boolean isDataAvailable() throws IOException { + return bufferedIn.available() > 0; + } + + @Override + public long getBytesRead() { + return countingIn.getBytesRead(); + } +}