IGNITE-5776: Add option to turn on filter reachable addresses in TcpCommunicationSpi. This closes #2317.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bd7a08e3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bd7a08e3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bd7a08e3 Branch: refs/heads/ignite-5757 Commit: bd7a08e31d03b2c51b225cf388dc1197348a1593 Parents: e285f9d Author: Evgenii Zhuravlev <[email protected]> Authored: Thu Jul 20 13:32:18 2017 +0300 Committer: Andrey V. Mashenkov <[email protected]> Committed: Thu Jul 20 13:32:18 2017 +0300 ---------------------------------------------------------------------- .../communication/tcp/TcpCommunicationSpi.java | 77 ++++++++++++++------ 1 file changed, 56 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bd7a08e3/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 35d3032..5b952e8 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -311,6 +311,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */ public static final boolean DFLT_TCP_NODELAY = true; + /** Default value for {@code FILTER_REACHABLE_ADDRESSES} socket option (value is <tt>false</tt>). */ + public static final boolean DFLT_FILTER_REACHABLE_ADDRESSES = false; + /** Default received messages threshold for sending ack. */ public static final int DFLT_ACK_SND_THRESHOLD = 32; @@ -1016,6 +1019,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** {@code TCP_NODELAY} option value for created sockets. */ private boolean tcpNoDelay = DFLT_TCP_NODELAY; + /** {@code FILTER_REACHABLE_ADDRESSES} option value for created sockets. */ + private boolean filterReachableAddresses = DFLT_FILTER_REACHABLE_ADDRESSES; + /** Number of received messages after which acknowledgment is sent. */ private int ackSndThreshold = DFLT_ACK_SND_THRESHOLD; @@ -1626,6 +1632,33 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } /** + * Gets value for {@code FILTER_REACHABLE_ADDRESSES} socket option. + * + * @return {@code True} if needed to filter reachable addresses. + */ + public boolean isFilterReachableAddresses() { + return filterReachableAddresses; + } + + /** + * Setting this option to {@code true} enables filter for reachable + * addresses on creating tcp client. + * <p> + * Usually its advised to set this value to {@code false}. + * <p> + * If not provided, default value is {@link #DFLT_FILTER_REACHABLE_ADDRESSES}. + * + * @param filterReachableAddresses {@code True} to filter reachable addresses. + * @return {@code this} for chaining. + */ + @IgniteSpiConfiguration(optional = true) + public TcpCommunicationSpi setFilterReachableAddresses(boolean filterReachableAddresses) { + this.filterReachableAddresses = filterReachableAddresses; + + return this; + } + + /** * Sets receive buffer size for sockets created or accepted by this SPI. * <p> * If not provided, default is {@link #DFLT_SOCK_BUF_SIZE}. @@ -2952,35 +2985,37 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (isExtAddrsExist) addrs.addAll(extAddrs); - Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size()); + if (filterReachableAddresses) { + Set<InetAddress> allInetAddrs = U.newHashSet(addrs.size()); - for (InetSocketAddress addr : addrs) { - // Skip unresolved as addr.getAddress() can return null. - if(!addr.isUnresolved()) - allInetAddrs.add(addr.getAddress()); - } + for (InetSocketAddress addr : addrs) { + // Skip unresolved as addr.getAddress() can return null. + if (!addr.isUnresolved()) + allInetAddrs.add(addr.getAddress()); + } - List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs); + List<InetAddress> reachableInetAddrs = U.filterReachable(allInetAddrs); - if (reachableInetAddrs.size() < allInetAddrs.size()) { - LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size()); + if (reachableInetAddrs.size() < allInetAddrs.size()) { + LinkedHashSet<InetSocketAddress> addrs0 = U.newLinkedHashSet(addrs.size()); - List<InetSocketAddress> unreachableInetAddr = new ArrayList<>(allInetAddrs.size() - reachableInetAddrs.size()); + List<InetSocketAddress> unreachableInetAddr = new ArrayList<>(allInetAddrs.size() - reachableInetAddrs.size()); - for (InetSocketAddress addr : addrs) { - if (reachableInetAddrs.contains(addr.getAddress())) - addrs0.add(addr); - else - unreachableInetAddr.add(addr); - } + for (InetSocketAddress addr : addrs) { + if (reachableInetAddrs.contains(addr.getAddress())) + addrs0.add(addr); + else + unreachableInetAddr.add(addr); + } - addrs0.addAll(unreachableInetAddr); + addrs0.addAll(unreachableInetAddr); - addrs = addrs0; - } + addrs = addrs0; + } - if (log.isDebugEnabled()) - log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']'); + if (log.isDebugEnabled()) + log.debug("Addresses to connect for node [rmtNode=" + node.id() + ", addrs=" + addrs.toString() + ']'); + } boolean conn = false; GridCommunicationClient client = null;
