http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java deleted file mode 100644 index 8c23e28..0000000 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java +++ /dev/null @@ -1,835 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.client.socket; - -import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.channels.SocketChannel; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Pattern; - -import javax.net.ssl.SSLContext; -import javax.security.cert.CertificateExpiredException; -import javax.security.cert.CertificateNotYetValidException; - -import org.apache.nifi.events.EventReporter; -import org.apache.nifi.remote.Peer; -import org.apache.nifi.remote.PeerStatus; -import org.apache.nifi.remote.RemoteDestination; -import org.apache.nifi.remote.RemoteResourceInitiator; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.SiteToSiteClientConfig; -import org.apache.nifi.remote.cluster.ClusterNodeInformation; -import org.apache.nifi.remote.cluster.NodeInformation; -import org.apache.nifi.remote.codec.FlowFileCodec; -import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.PortNotRunningException; -import org.apache.nifi.remote.exception.ProtocolException; -import org.apache.nifi.remote.exception.TransmissionDisabledException; -import org.apache.nifi.remote.exception.UnknownPortException; -import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; -import org.apache.nifi.remote.protocol.CommunicationsSession; -import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; -import org.apache.nifi.remote.util.PeerStatusCache; -import org.apache.nifi.remote.util.RemoteNiFiUtils; -import org.apache.nifi.reporting.Severity; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.web.api.dto.ControllerDTO; -import org.apache.nifi.web.api.dto.PortDTO; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EndpointConnectionStatePool { - public static final long PEER_REFRESH_PERIOD = 60000L; - public static final String CATEGORY = "Site-to-Site"; - public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); - - private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); - - private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionStatePool.class); - - private final BlockingQueue<EndpointConnectionState> connectionStateQueue = new LinkedBlockingQueue<>(); - private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>(); - private final URI clusterUrl; - private final String apiUri; - - private final AtomicLong peerIndex = new AtomicLong(0L); - - private final ReentrantLock peerRefreshLock = new ReentrantLock(); - private volatile List<PeerStatus> peerStatuses; - private volatile long peerRefreshTime = 0L; - private volatile PeerStatusCache peerStatusCache; - private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>(); - - private final File peersFile; - private final EventReporter eventReporter; - private final SSLContext sslContext; - private final ScheduledExecutorService taskExecutor; - - private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock(); - private final Lock remoteInfoReadLock = listeningPortRWLock.readLock(); - private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock(); - private Integer siteToSitePort; - private Boolean siteToSiteSecure; - private long remoteRefreshTime; - private final Map<String, String> inputPortMap = new HashMap<>(); // map input port name to identifier - private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier - - private volatile int commsTimeout; - - public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final EventReporter eventReporter, final File persistenceFile) { - this(clusterUrl, commsTimeoutMillis, null, eventReporter, persistenceFile); - } - - public EndpointConnectionStatePool(final String clusterUrl, final int commsTimeoutMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) { - try { - this.clusterUrl = new URI(clusterUrl); - } catch (final URISyntaxException e) { - throw new IllegalArgumentException("Invalid Cluster URL: " + clusterUrl); - } - - // Trim the trailing / - String uriPath = this.clusterUrl.getPath(); - if (uriPath.endsWith("/")) { - uriPath = uriPath.substring(0, uriPath.length() - 1); - } - apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api"; - - this.sslContext = sslContext; - this.peersFile = persistenceFile; - this.eventReporter = eventReporter; - this.commsTimeout = commsTimeoutMillis; - - Set<PeerStatus> recoveredStatuses; - if ( persistenceFile != null && persistenceFile.exists() ) { - try { - recoveredStatuses = recoverPersistedPeerStatuses(peersFile); - this.peerStatusCache = new PeerStatusCache(recoveredStatuses, peersFile.lastModified()); - } catch (final IOException ioe) { - logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe); - } - } else { - peerStatusCache = null; - } - - // Initialize a scheduled executor and run some maintenance tasks in the background to kill off old, unused - // connections and keep our list of peers up-to-date. - taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { - private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); - - @Override - public Thread newThread(final Runnable r) { - final Thread thread = defaultFactory.newThread(r); - thread.setName("NiFi Site-to-Site Connection Pool Maintenance"); - return thread; - } - }); - - taskExecutor.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - refreshPeers(); - } - }, 0, 5, TimeUnit.SECONDS); - - taskExecutor.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - cleanupExpiredSockets(); - } - }, 5, 5, TimeUnit.SECONDS); - } - - - public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { - return getEndpointConnectionState(remoteDestination, direction, null); - } - - - - public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { - // - // Attempt to get a connection state that already exists for this URL. - // - FlowFileCodec codec = null; - CommunicationsSession commsSession = null; - SocketClientProtocol protocol = null; - EndpointConnectionState connectionState; - Peer peer = null; - - final List<EndpointConnectionState> addBack = new ArrayList<>(); - try { - do { - final PeerStatus peerStatus = getNextPeerStatus(direction); - if ( peerStatus == null ) { - return null; - } - - connectionState = connectionStateQueue.poll(); - logger.debug("{} Connection State for {} = {}", this, clusterUrl, connectionState); - - if ( connectionState == null && !addBack.isEmpty() ) { - // all available connections have been penalized. - return null; - } - - if ( connectionState != null && connectionState.getPeer().isPenalized() ) { - // we have a connection, but it's penalized. We want to add it back to the queue - // when we've found one to use. - addBack.add(connectionState); - continue; - } - - // if we can't get an existing ConnectionState, create one - if ( connectionState == null ) { - protocol = new SocketClientProtocol(); - protocol.setDestination(remoteDestination); - - try { - commsSession = establishSiteToSiteConnection(peerStatus); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - try { - RemoteResourceInitiator.initiateResourceNegotiation(protocol, dis, dos); - } catch (final HandshakeException e) { - try { - commsSession.close(); - } catch (final IOException ioe) { - throw e; - } - } - } catch (final IOException e) { - } - - - final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort(); - peer = new Peer(commsSession, peerUrl, clusterUrl.toString()); - - // set properties based on config - if ( config != null ) { - protocol.setTimeout((int) config.getTimeout(TimeUnit.MILLISECONDS)); - protocol.setPreferredBatchCount(config.getPreferredBatchCount()); - protocol.setPreferredBatchSize(config.getPreferredBatchSize()); - protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS)); - } - - // perform handshake - try { - protocol.handshake(peer); - - // handle error cases - if ( protocol.isDestinationFull() ) { - logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer); - penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); - connectionStateQueue.offer(connectionState); - continue; - } else if ( protocol.isPortInvalid() ) { - penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); - cleanup(protocol, peer); - throw new PortNotRunningException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not running"); - } else if ( protocol.isPortUnknown() ) { - penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); - cleanup(protocol, peer); - throw new UnknownPortException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not known"); - } - - // negotiate the FlowFileCodec to use - codec = protocol.negotiateCodec(peer); - } catch (final PortNotRunningException | UnknownPortException e) { - throw e; - } catch (final Exception e) { - penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); - cleanup(protocol, peer); - - final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString()); - logger.error(message); - if ( logger.isDebugEnabled() ) { - logger.error("", e); - } - throw e; - } - - connectionState = new EndpointConnectionState(peer, protocol, codec); - } else { - final long lastTimeUsed = connectionState.getLastTimeUsed(); - final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed; - - if ( commsTimeout > 0L && millisSinceLastUse >= commsTimeout ) { - cleanup(connectionState.getSocketClientProtocol(), connectionState.getPeer()); - connectionState = null; - } else { - codec = connectionState.getCodec(); - peer = connectionState.getPeer(); - commsSession = peer.getCommunicationsSession(); - protocol = connectionState.getSocketClientProtocol(); - } - } - } while ( connectionState == null || codec == null || commsSession == null || protocol == null ); - } finally { - if ( !addBack.isEmpty() ) { - connectionStateQueue.addAll(addBack); - } - } - - return connectionState; - } - - - public boolean offer(final EndpointConnectionState endpointConnectionState) { - final Peer peer = endpointConnectionState.getPeer(); - if ( peer == null ) { - return false; - } - - final String url = peer.getUrl(); - if ( url == null ) { - return false; - } - - return connectionStateQueue.offer(endpointConnectionState); - } - - /** - * Updates internal state map to penalize a PeerStatus that points to the specified peer - * @param peer - */ - public void penalize(final Peer peer, final long penalizationMillis) { - String host; - int port; - try { - final URI uri = new URI(peer.getUrl()); - host = uri.getHost(); - port = uri.getPort(); - } catch (final URISyntaxException e) { - host = peer.getHost(); - port = -1; - } - - final PeerStatus status = new PeerStatus(host, port, true, 1); - Long expiration = peerTimeoutExpirations.get(status); - if ( expiration == null ) { - expiration = Long.valueOf(0L); - } - - final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis); - peerTimeoutExpirations.put(status, Long.valueOf(newExpiration)); - } - - private void cleanup(final SocketClientProtocol protocol, final Peer peer) { - if ( protocol != null && peer != null ) { - try { - protocol.shutdown(peer); - } catch (final TransmissionDisabledException e) { - // User disabled transmission.... do nothing. - logger.debug(this + " Transmission Disabled by User"); - } catch (IOException e1) { - } - } - - if ( peer != null ) { - try { - peer.close(); - } catch (final TransmissionDisabledException e) { - // User disabled transmission.... do nothing. - logger.debug(this + " Transmission Disabled by User"); - } catch (IOException e1) { - } - } - } - - private PeerStatus getNextPeerStatus(final TransferDirection direction) { - List<PeerStatus> peerList = peerStatuses; - if ( (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) && peerRefreshLock.tryLock() ) { - try { - try { - peerList = createPeerStatusList(direction); - } catch (final Exception e) { - final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString()); - logger.warn(message); - if ( logger.isDebugEnabled() ) { - logger.warn("", e); - } - - if ( eventReporter != null ) { - eventReporter.reportEvent(Severity.WARNING, CATEGORY, message); - } - } - - this.peerStatuses = peerList; - peerRefreshTime = System.currentTimeMillis(); - } finally { - peerRefreshLock.unlock(); - } - } - - if ( peerList == null || peerList.isEmpty() ) { - return null; - } - - PeerStatus peerStatus; - for (int i=0; i < peerList.size(); i++) { - final long idx = peerIndex.getAndIncrement(); - final int listIndex = (int) (idx % peerList.size()); - peerStatus = peerList.get(listIndex); - - if ( isPenalized(peerStatus) ) { - logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus); - } else { - return peerStatus; - } - } - - logger.debug("{} All peers appear to be penalized; returning null", this); - return null; - } - - private boolean isPenalized(final PeerStatus peerStatus) { - final Long expirationEnd = peerTimeoutExpirations.get(peerStatus); - return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis() ); - } - - private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException { - final Set<PeerStatus> statuses = getPeerStatuses(); - if ( statuses == null ) { - return new ArrayList<>(); - } - - final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); - final List<NodeInformation> nodeInfos = new ArrayList<>(); - for ( final PeerStatus peerStatus : statuses ) { - final NodeInformation nodeInfo = new NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, peerStatus.isSecure(), peerStatus.getFlowFileCount()); - nodeInfos.add(nodeInfo); - } - clusterNodeInfo.setNodeInformation(nodeInfos); - return formulateDestinationList(clusterNodeInfo, direction); - } - - - private Set<PeerStatus> getPeerStatuses() { - final PeerStatusCache cache = this.peerStatusCache; - if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) { - return null; - } - - if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) { - final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size()); - for (final PeerStatus status : cache.getStatuses()) { - final PeerStatus equalizedStatus = new PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1); - equalizedSet.add(equalizedStatus); - } - - return equalizedSet; - } - - return cache.getStatuses(); - } - - private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException { - final String hostname = clusterUrl.getHost(); - final int port = getSiteToSitePort(); - - final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port); - final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString()); - final SocketClientProtocol clientProtocol = new SocketClientProtocol(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, dis, dos); - - clientProtocol.setTimeout(commsTimeout); - clientProtocol.handshake(peer, null); - final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer); - persistPeerStatuses(peerStatuses); - - try { - clientProtocol.shutdown(peer); - } catch (final IOException e) { - final String message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString()); - logger.warn(message); - if (logger.isDebugEnabled()) { - logger.warn("", e); - } - } - - try { - peer.close(); - } catch (final IOException e) { - final String message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString()); - logger.warn(message); - if (logger.isDebugEnabled()) { - logger.warn("", e); - } - } - - return peerStatuses; - } - - - private void persistPeerStatuses(final Set<PeerStatus> statuses) { - if ( peersFile == null ) { - return; - } - - try (final OutputStream fos = new FileOutputStream(peersFile); - final OutputStream out = new BufferedOutputStream(fos)) { - - for (final PeerStatus status : statuses) { - final String line = status.getHostname() + ":" + status.getPort() + ":" + status.isSecure() + "\n"; - out.write(line.getBytes(StandardCharsets.UTF_8)); - } - - } catch (final IOException e) { - logger.error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString(), e); - } - } - - private Set<PeerStatus> recoverPersistedPeerStatuses(final File file) throws IOException { - if (!file.exists()) { - return null; - } - - final Set<PeerStatus> statuses = new HashSet<>(); - try (final InputStream fis = new FileInputStream(file); - final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) { - - String line; - while ((line = reader.readLine()) != null) { - final String[] splits = line.split(Pattern.quote(":")); - if (splits.length != 3) { - continue; - } - - final String hostname = splits[0]; - final int port = Integer.parseInt(splits[1]); - final boolean secure = Boolean.parseBoolean(splits[2]); - - statuses.add(new PeerStatus(hostname, port, secure, 1)); - } - } - - return statuses; - } - - - private CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException { - return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort()); - } - - private CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port) throws IOException { - if ( siteToSiteSecure == null ) { - throw new IOException("Remote NiFi instance " + clusterUrl + " is not currently configured to accept site-to-site connections"); - } - - final String destinationUri = "nifi://" + hostname + ":" + port; - - CommunicationsSession commsSession = null; - try { - if ( siteToSiteSecure ) { - if ( sslContext == null ) { - throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications"); - } - - final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true); - socketChannel.connect(); - - commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri); - - try { - commsSession.setUserDn(socketChannel.getDn()); - } catch (final CertificateNotYetValidException | CertificateExpiredException ex) { - throw new IOException(ex); - } - } else { - final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); - commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri); - } - - commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES); - commsSession.setUri(destinationUri); - } catch (final IOException ioe) { - if ( commsSession != null ) { - commsSession.close(); - } - - throw ioe; - } - - return commsSession; - } - - - static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) { - final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation(); - final int numDestinations = Math.max(128, nodeInfoSet.size()); - final Map<NodeInformation, Integer> entryCountMap = new HashMap<>(); - - long totalFlowFileCount = 0L; - for (final NodeInformation nodeInfo : nodeInfoSet) { - totalFlowFileCount += nodeInfo.getTotalFlowFiles(); - } - - int totalEntries = 0; - for (final NodeInformation nodeInfo : nodeInfoSet) { - final int flowFileCount = nodeInfo.getTotalFlowFiles(); - // don't allow any node to get more than 80% of the data - final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount)); - final double relativeWeighting = (direction == TransferDirection.RECEIVE) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles; - final int entries = Math.max(1, (int) (numDestinations * relativeWeighting)); - - entryCountMap.put(nodeInfo, Math.max(1, entries)); - totalEntries += entries; - } - - final List<PeerStatus> destinations = new ArrayList<>(totalEntries); - for (int i=0; i < totalEntries; i++) { - destinations.add(null); - } - for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) { - final NodeInformation nodeInfo = entry.getKey(); - final int numEntries = entry.getValue(); - - int skipIndex = numEntries; - for (int i=0; i < numEntries; i++) { - int n = (skipIndex * i); - while (true) { - final int index = n % destinations.size(); - PeerStatus status = destinations.get(index); - if ( status == null ) { - status = new PeerStatus(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), nodeInfo.getTotalFlowFiles()); - destinations.set(index, status); - break; - } else { - n++; - } - } - } - } - - final StringBuilder distributionDescription = new StringBuilder(); - distributionDescription.append("New Weighted Distribution of Nodes:"); - for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) { - final double percentage = entry.getValue() * 100D / (double) destinations.size(); - distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of FlowFiles"); - } - logger.info(distributionDescription.toString()); - - // Jumble the list of destinations. - return destinations; - } - - - private void cleanupExpiredSockets() { - final List<EndpointConnectionState> states = new ArrayList<>(); - - EndpointConnectionState state; - while ((state = connectionStateQueue.poll()) != null) { - // If the socket has not been used in 10 seconds, shut it down. - final long lastUsed = state.getLastTimeUsed(); - if ( lastUsed < System.currentTimeMillis() - 10000L ) { - try { - state.getSocketClientProtocol().shutdown(state.getPeer()); - } catch (final Exception e) { - logger.debug("Failed to shut down {} using {} due to {}", - new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} ); - } - - cleanup(state.getSocketClientProtocol(), state.getPeer()); - } else { - states.add(state); - } - } - - connectionStateQueue.addAll(states); - } - - public void shutdown() { - taskExecutor.shutdown(); - peerTimeoutExpirations.clear(); - - for ( final CommunicationsSession commsSession : activeCommsChannels ) { - commsSession.interrupt(); - } - - EndpointConnectionState state; - while ( (state = connectionStateQueue.poll()) != null) { - cleanup(state.getSocketClientProtocol(), state.getPeer()); - } - } - - public void terminate(final EndpointConnectionState state) { - cleanup(state.getSocketClientProtocol(), state.getPeer()); - } - - private void refreshPeers() { - final PeerStatusCache existingCache = peerStatusCache; - if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) { - return; - } - - try { - final Set<PeerStatus> statuses = fetchRemotePeerStatuses(); - peerStatusCache = new PeerStatusCache(statuses); - logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size()); - } catch (Exception e) { - logger.warn("{} Unable to refresh Remote Group's peers due to {}", this, e); - if (logger.isDebugEnabled()) { - logger.warn("", e); - } - } - } - - - public String getInputPortIdentifier(final String portName) throws IOException { - return getPortIdentifier(portName, inputPortMap); - } - - public String getOutputPortIdentifier(final String portName) throws IOException { - return getPortIdentifier(portName, outputPortMap); - } - - - private String getPortIdentifier(final String portName, final Map<String, String> portMap) throws IOException { - String identifier; - remoteInfoReadLock.lock(); - try { - identifier = portMap.get(portName); - } finally { - remoteInfoReadLock.unlock(); - } - - if ( identifier != null ) { - return identifier; - } - - refreshRemoteInfo(); - - remoteInfoReadLock.lock(); - try { - return portMap.get(portName); - } finally { - remoteInfoReadLock.unlock(); - } - } - - - private ControllerDTO refreshRemoteInfo() throws IOException { - final boolean webInterfaceSecure = clusterUrl.toString().startsWith("https"); - final RemoteNiFiUtils utils = new RemoteNiFiUtils(webInterfaceSecure ? sslContext : null); - final ControllerDTO controller = utils.getController(URI.create(apiUri + "/controller"), commsTimeout); - - remoteInfoWriteLock.lock(); - try { - this.siteToSitePort = controller.getRemoteSiteListeningPort(); - this.siteToSiteSecure = controller.isSiteToSiteSecure(); - - inputPortMap.clear(); - for (final PortDTO inputPort : controller.getInputPorts()) { - inputPortMap.put(inputPort.getName(), inputPort.getId()); - } - - outputPortMap.clear(); - for ( final PortDTO outputPort : controller.getOutputPorts()) { - outputPortMap.put(outputPort.getName(), outputPort.getId()); - } - - this.remoteRefreshTime = System.currentTimeMillis(); - } finally { - remoteInfoWriteLock.unlock(); - } - - return controller; - } - - /** - * @return the port that the remote instance is listening on for - * site-to-site communication, or <code>null</code> if the remote instance - * is not configured to allow site-to-site communications. - * - * @throws IOException if unable to communicate with the remote instance - */ - private Integer getSiteToSitePort() throws IOException { - Integer listeningPort; - remoteInfoReadLock.lock(); - try { - listeningPort = this.siteToSitePort; - if (listeningPort != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) { - return listeningPort; - } - } finally { - remoteInfoReadLock.unlock(); - } - - final ControllerDTO controller = refreshRemoteInfo(); - listeningPort = controller.getRemoteSiteListeningPort(); - - return listeningPort; - } - - /** - * Returns {@code true} if the remote instance is configured for secure site-to-site communications, - * {@code false} otherwise. - * - * @return - * @throws IOException - */ - public boolean isSecure() throws IOException { - remoteInfoReadLock.lock(); - try { - final Boolean secure = this.siteToSiteSecure; - if (secure != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) { - return secure; - } - } finally { - remoteInfoReadLock.unlock(); - } - - final ControllerDTO controller = refreshRemoteInfo(); - return controller.isSiteToSiteSecure(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index 0494d04..6fa934b 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -24,23 +24,23 @@ import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClientConfig; -import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.PortNotRunningException; -import org.apache.nifi.remote.exception.ProtocolException; -import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.util.ObjectHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SocketClient implements SiteToSiteClient { + private static final Logger logger = LoggerFactory.getLogger(SocketClient.class); + private final SiteToSiteClientConfig config; - private final EndpointConnectionStatePool pool; + private final EndpointConnectionPool pool; private final boolean compress; private final String portName; private final long penalizationNanos; private volatile String portIdentifier; public SocketClient(final SiteToSiteClientConfig config) { - pool = new EndpointConnectionStatePool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS), + pool = new EndpointConnectionPool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS), config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile()); this.config = config; @@ -66,44 +66,55 @@ public class SocketClient implements SiteToSiteClient { return id; } + final String portId; if ( direction == TransferDirection.SEND ) { - return pool.getInputPortIdentifier(this.portName); + portId = pool.getInputPortIdentifier(this.portName); } else { - return pool.getOutputPortIdentifier(this.portName); + portId = pool.getOutputPortIdentifier(this.portName); } + + if (portId == null) { + logger.debug("Unable to resolve port [{}] to an identifier", portName); + } else { + logger.debug("Resolved port [{}] to identifier [{}]", portName, portId); + } + + return portId; } + private RemoteDestination createRemoteDestination(final String portId) { + return new RemoteDestination() { + @Override + public String getIdentifier() { + return portId; + } + + @Override + public long getYieldPeriod(final TimeUnit timeUnit) { + return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS); + } + + @Override + public boolean isUseCompression() { + return compress; + } + }; + } + @Override public Transaction createTransaction(final TransferDirection direction) throws IOException { - final String portId = getPortIdentifier(TransferDirection.SEND); + final String portId = getPortIdentifier(direction); if ( portId == null ) { - throw new IOException("Could not find Port with name " + portName + " for remote NiFi instance"); + throw new IOException("Could not find Port with name '" + portName + "' for remote NiFi instance"); } - final RemoteDestination remoteDestination = new RemoteDestination() { - @Override - public String getIdentifier() { - return portId; - } - - @Override - public long getYieldPeriod(final TimeUnit timeUnit) { - return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS); - } - - @Override - public boolean isUseCompression() { - return compress; - } - }; + final RemoteDestination remoteDestination = createRemoteDestination(portId); - final EndpointConnectionState connectionState; - try { - connectionState = pool.getEndpointConnectionState(remoteDestination, direction); - } catch (final ProtocolException | HandshakeException | PortNotRunningException | UnknownPortException e) { - throw new IOException(e); + final EndpointConnection connectionState = pool.getEndpointConnection(remoteDestination, direction, getConfig()); + if ( connectionState == null ) { + return null; } final Transaction transaction = connectionState.getSocketClientProtocol().startTransaction( @@ -111,7 +122,7 @@ public class SocketClient implements SiteToSiteClient { // Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever // the transaction is either completed or canceled. - final ObjectHolder<EndpointConnectionState> connectionStateRef = new ObjectHolder<>(connectionState); + final ObjectHolder<EndpointConnection> connectionStateRef = new ObjectHolder<>(connectionState); return new Transaction() { @Override public void confirm() throws IOException { @@ -119,11 +130,16 @@ public class SocketClient implements SiteToSiteClient { } @Override + public void complete() throws IOException { + complete(false); + } + + @Override public void complete(final boolean requestBackoff) throws IOException { try { transaction.complete(requestBackoff); } finally { - final EndpointConnectionState state = connectionStateRef.get(); + final EndpointConnection state = connectionStateRef.get(); if ( state != null ) { pool.offer(connectionState); connectionStateRef.set(null); @@ -136,7 +152,7 @@ public class SocketClient implements SiteToSiteClient { try { transaction.cancel(explanation); } finally { - final EndpointConnectionState state = connectionStateRef.get(); + final EndpointConnection state = connectionStateRef.get(); if ( state != null ) { pool.terminate(connectionState); connectionStateRef.set(null); @@ -149,7 +165,7 @@ public class SocketClient implements SiteToSiteClient { try { transaction.error(); } finally { - final EndpointConnectionState state = connectionStateRef.get(); + final EndpointConnection state = connectionStateRef.get(); if ( state != null ) { pool.terminate(connectionState); connectionStateRef.set(null); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java index b61fc65..d4d55e1 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java @@ -16,8 +16,15 @@ */ package org.apache.nifi.remote.exception; -public class HandshakeException extends Exception { +import java.io.IOException; + +/** + * A HandshakeException occurs when the client and the remote NiFi instance do not agree + * on some condition during the handshake. For example, if the NiFi instance does not recognize + * one of the parameters that the client passes during the Handshaking phase. + */ +public class HandshakeException extends IOException { private static final long serialVersionUID = 178192341908726L; public HandshakeException(final String message) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java index af0f467..8b97832 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java @@ -16,8 +16,12 @@ */ package org.apache.nifi.remote.exception; -public class PortNotRunningException extends Exception { - +/** + * PortNotRunningException occurs when the remote NiFi instance reports + * that the Port that the client is attempting to communicate with is not + * currently running and therefore communications with that Port are not allowed. + */ +public class PortNotRunningException extends ProtocolException { private static final long serialVersionUID = -2790940982005516375L; public PortNotRunningException(final String message) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java index e12348a..45a4e15 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java @@ -18,6 +18,10 @@ package org.apache.nifi.remote.exception; import java.io.IOException; +/** + * A ProtocolException occurs when unexpected data is received, for example + * an invalid Response Code. + */ public class ProtocolException extends IOException { private static final long serialVersionUID = 5763900324505818495L; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java index e6a0fe7..592a1b3 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java @@ -16,8 +16,11 @@ */ package org.apache.nifi.remote.exception; -public class UnknownPortException extends Exception { - +/** + * An UnknownPortException indicates that the remote NiFi instance has reported that + * the endpoint that the client attempted to communicate with does not exist. + */ +public class UnknownPortException extends ProtocolException { private static final long serialVersionUID = -2790940982005516375L; public UnknownPortException(final String message) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java index 9e451fd..7dffddd 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java @@ -63,4 +63,9 @@ public class SocketChannelInput implements CommunicationsInput { public void interrupt() { interruptableIn.interrupt(); } + + @Override + public void consume() throws IOException { + socketIn.consume(); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java index 60ef33f..01fb9f2 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java @@ -47,4 +47,9 @@ public class SSLSocketChannelInput implements CommunicationsInput { public long getBytesRead() { return countingIn.getBytesRead(); } + + @Override + public void consume() throws IOException { + in.consume(); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java index befbdaa..36a0e8d 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java @@ -40,9 +40,9 @@ public interface ClientProtocol extends VersionedRemoteResource { FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException; - void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; + int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; - void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; + int transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; void shutdown(Peer peer) throws IOException, ProtocolException; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java index d2e2946..5e56902 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java @@ -21,6 +21,12 @@ import java.io.InputStream; public interface CommunicationsInput { + /** + * Reads all data currently on the socket and throws it away + * @throws IOException + */ + void consume() throws IOException; + InputStream getInputStream() throws IOException; long getBytesRead(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index 5f194f8..e321663 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -150,6 +150,7 @@ public class SocketClientProtocol implements ClientProtocol { } } + logger.debug("Handshaking with properties {}", properties); dos.writeInt(properties.size()); for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) { dos.writeUTF(entry.getKey().name()); @@ -269,13 +270,13 @@ public class SocketClientProtocol implements ClientProtocol { throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse); } - return new SocketClientTransaction(versionNegotiator.getVersion(), peer, codec, + return new SocketClientTransaction(versionNegotiator.getVersion(), destination.getIdentifier(), peer, codec, direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS)); } @Override - public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { + public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { final String userDn = peer.getCommunicationsSession().getUserDn(); final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE); @@ -288,7 +289,7 @@ public class SocketClientProtocol implements ClientProtocol { final DataPacket dataPacket = transaction.receive(); if ( dataPacket == null ) { if ( flowFilesReceived.isEmpty() ) { - peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS)); + peer.penalize(destination.getIdentifier(), destination.getYieldPeriod(TimeUnit.MILLISECONDS)); } break; } @@ -322,25 +323,25 @@ public class SocketClientProtocol implements ClientProtocol { transaction.complete(applyBackpressure); logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); - if ( flowFilesReceived.isEmpty() ) { - return; + if ( !flowFilesReceived.isEmpty() ) { + stopWatch.stop(); + final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; + final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesReceived); + logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate }); } - stopWatch.stop(); - final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; - final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); - final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - final String dataSize = FormatUtils.formatDataSize(bytesReceived); - logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { - this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate }); + return flowFilesReceived.size(); } @Override - public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { + public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { FlowFile flowFile = session.get(); if (flowFile == null) { - return; + return 0; } try { @@ -401,6 +402,8 @@ public class SocketClientProtocol implements ClientProtocol { final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] { this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + + return flowFilesSent.size(); } catch (final Exception e) { session.rollback(); throw e; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java index edb360e..cf8f9b2 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java @@ -19,6 +19,7 @@ package org.apache.nifi.remote.protocol.socket; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.zip.CRC32; import java.util.zip.CheckedInputStream; @@ -29,6 +30,8 @@ import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.io.CompressionInputStream; +import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.RequestType; import org.slf4j.Logger; @@ -47,14 +50,16 @@ public class SocketClientTransaction implements Transaction { private final boolean compress; private final Peer peer; private final int penaltyMillis; + private final String destinationId; private boolean dataAvailable = false; private int transfers = 0; private TransactionState state; - SocketClientTransaction(final int protocolVersion, final Peer peer, final FlowFileCodec codec, + SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec, final TransferDirection direction, final boolean useCompression, final int penaltyMillis) throws IOException { this.protocolVersion = protocolVersion; + this.destinationId = destinationId; this.peer = peer; this.codec = codec; this.direction = direction; @@ -140,7 +145,8 @@ public class SocketClientTransaction implements Transaction { } logger.debug("{} Receiving data from {}", this, peer); - final DataPacket packet = codec.decode(new CheckedInputStream(dis, crc)); + final InputStream dataIn = compress ? new CompressionInputStream(dis) : dis; + final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc)); if ( packet == null ) { this.dataAvailable = false; @@ -174,7 +180,8 @@ public class SocketClientTransaction implements Transaction { logger.debug("{} Sending data to {}", this, peer); - final OutputStream out = new CheckedOutputStream(dos, crc); + final OutputStream dataOut = compress ? new CompressionOutputStream(dos) : dos; + final OutputStream out = new CheckedOutputStream(dataOut, crc); codec.encode(dataPacket, out); // need to close the CompressionOutputStream in order to force it write out any remaining bytes. @@ -208,6 +215,10 @@ public class SocketClientTransaction implements Transaction { } } + @Override + public void complete() throws IOException { + complete(false); + } @Override public void complete(boolean requestBackoff) throws IOException { @@ -246,7 +257,7 @@ public class SocketClientTransaction implements Transaction { logger.debug("{} Received {} from {}", this, transactionResponse, peer); if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { - peer.penalize(penaltyMillis); + peer.penalize(destinationId, penaltyMillis); } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java index d8899ea..275e40c 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java @@ -39,7 +39,7 @@ public class TestEndpointConnectionStatePool { collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096)); clusterNodeInfo.setNodeInformation(collection); - final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); for ( final PeerStatus peerStatus : destinations ) { System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); } @@ -53,7 +53,7 @@ public class TestEndpointConnectionStatePool { collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000)); clusterNodeInfo.setNodeInformation(collection); - final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); for ( final PeerStatus peerStatus : destinations ) { System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); } @@ -73,7 +73,7 @@ public class TestEndpointConnectionStatePool { collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096)); clusterNodeInfo.setNodeInformation(collection); - final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); for ( final PeerStatus peerStatus : destinations ) { System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); } @@ -87,7 +87,7 @@ public class TestEndpointConnectionStatePool { collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000)); clusterNodeInfo.setNodeInformation(collection); - final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + final List<PeerStatus> destinations = EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); for ( final PeerStatus peerStatus : destinations ) { System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java new file mode 100644 index 0000000..a744905 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.util.StandardDataPacket; +import org.apache.nifi.stream.io.StreamUtils; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +public class TestSiteToSiteClient { + + @Test + @Ignore("For local testing only; not really a unit test but a manual test") + public void testReceive() throws IOException { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); + + final SiteToSiteClient client = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("out") + .requestBatchCount(1) + .build(); + + try { + final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); + Assert.assertNotNull(transaction); + + final DataPacket packet = transaction.receive(); + Assert.assertNotNull(packet); + + final InputStream in = packet.getData(); + final long size = packet.getSize(); + final byte[] buff = new byte[(int) size]; + + StreamUtils.fillBuffer(in, buff); + System.out.println(buff.length); + + Assert.assertNull(transaction.receive()); + + transaction.confirm(); + transaction.complete(false); + } finally { + client.close(); + } + } + + + @Test + @Ignore("For local testing only; not really a unit test but a manual test") + public void testSend() throws IOException { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); + + final SiteToSiteClient client = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("in") + .build(); + + try { + final Transaction transaction = client.createTransaction(TransferDirection.SEND); + Assert.assertNotNull(transaction); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("site-to-site", "yes, please!"); + final byte[] bytes = "Hello".getBytes(); + final ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length); + transaction.send(packet); + + transaction.confirm(); + transaction.complete(false); + } finally { + client.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java index 32a3f26..f68c874 100644 --- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java +++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java @@ -43,6 +43,16 @@ public class SocketChannelInputStream extends InputStream { public void setTimeout(final int timeoutMillis) { this.timeoutMillis = timeoutMillis; } + + public void consume() throws IOException { + final byte[] b = new byte[4096]; + final ByteBuffer buffer = ByteBuffer.wrap(b); + int bytesRead; + do { + bytesRead = channel.read(buffer); + buffer.flip(); + } while ( bytesRead > 0 ); + } @Override public int read() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java index 5810488..7c74b20 100644 --- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java +++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java @@ -258,6 +258,16 @@ public class SSLSocketChannel implements Closeable { } } + public void consume() throws IOException { + final byte[] b = new byte[4096]; + final ByteBuffer buffer = ByteBuffer.wrap(b); + int readCount; + do { + readCount = channel.read(buffer); + buffer.flip(); + } while (readCount > 0); + } + private int readData(final ByteBuffer dest) throws IOException { final long startTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java index 154bd08..6fb79d4 100644 --- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java +++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java @@ -27,6 +27,10 @@ public class SSLSocketChannelInputStream extends InputStream { this.channel = channel; } + public void consume() throws IOException { + channel.consume(); + } + @Override public int read() throws IOException { return channel.read(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index ac41cba..c842195 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -24,9 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.connectable.Position; import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.events.EventReporter; -import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.RemoteGroupPort; -import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool; public interface RemoteProcessGroup { @@ -81,8 +79,6 @@ public interface RemoteProcessGroup { String getYieldDuration(); - EndpointConnectionStatePool getConnectionPool(); - /** * Sets the timeout using the TimePeriod format (e.g., "30 secs", "1 min") * http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 7cb2874..54f0807 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -48,6 +48,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.net.ssl.SSLContext; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.admin.service.UserService; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnRemoved; @@ -128,14 +129,12 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; import org.apache.nifi.groups.StandardProcessGroup; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.logging.LogRepository; import org.apache.nifi.logging.LogRepositoryFactory; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.logging.ProcessorLogObserver; import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.nar.NarClassLoader; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.nar.NarThreadContextClassLoader; import org.apache.nifi.processor.Processor; @@ -165,6 +164,7 @@ import org.apache.nifi.reporting.EventAccess; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; @@ -184,7 +184,6 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index db0aeb7..79ef7a8 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -56,7 +56,6 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; -import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool; import org.apache.nifi.remote.util.RemoteNiFiUtils; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.Severity; @@ -130,7 +129,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private volatile String authorizationIssue; - private final EndpointConnectionStatePool endpointConnectionPool; private final ScheduledExecutorService backgroundThreadExecutor; public StandardRemoteProcessGroup(final String id, final String targetUri, final ProcessGroup processGroup, @@ -172,13 +170,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } }; - endpointConnectionPool = new EndpointConnectionStatePool(getTargetUri().toString(), getCommunicationsTimeout(TimeUnit.MILLISECONDS), - sslContext, eventReporter, getPeerPersistenceFile()); - final Runnable checkAuthorizations = new InitializationTask(); - backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + id + ": " + targetUri); - backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 0L, 30L, TimeUnit.SECONDS); + backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 5L, 30L, TimeUnit.SECONDS); } @Override @@ -200,7 +194,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { @Override public void shutdown() { backgroundThreadExecutor.shutdown(); - endpointConnectionPool.shutdown(); } @Override @@ -1222,11 +1215,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } @Override - public EndpointConnectionStatePool getConnectionPool() { - return endpointConnectionPool; - } - - @Override public void verifyCanDelete() { verifyCanDelete(false); }