ibessonov commented on code in PR #6042:
URL: https://github.com/apache/ignite-3/pull/6042#discussion_r2152034994


##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -107,82 +123,74 @@ public MulticastNodeFinder(
         this.resultWaitMillis = resultWaitMillis;
         this.ttl = ttl;
         this.localAddressToAdvertise = localAddressToAdvertise;
-        this.nodeName = nodeName;
-        this.listenerThreadPool = 
Executors.newSingleThreadExecutor(NamedThreadFactory.create(nodeName, 
"multicast-node-listener", LOG));
+
+        this.threadFactory = NamedThreadFactory.create(nodeName, 
"multicast-node-finder", LOG);
+        this.listenerThreadPool = 
Executors.newSingleThreadExecutor(threadFactory);
     }
 
     @Override
     public Collection<NetworkAddress> findNodes() {
-        Collection<NetworkInterface> interfaces = 
getEligibleNetworkInterfaces();
-        if (interfaces.isEmpty()) {
-            throw new IgniteInternalException(INTERNAL_ERR, "No network 
interfaces eligible for a multicast found");
-        }
-
         Set<NetworkAddress> result = new HashSet<>();
-        List<CompletableFuture<Collection<NetworkAddress>>> 
findOnInterfaceFutures = new ArrayList<>();
 
-        ExecutorService executor = Executors.newFixedThreadPool(
-                interfaces.size(),
-                NamedThreadFactory.create(nodeName, "multicast-node-finder", 
LOG)
-        );
+        // Creates multicast sockets for all eligible interfaces and an 
unbound socket. Will throw an exception if no sockets were created.
+        List<MulticastSocket> sockets = createSockets(0, resultWaitMillis, 
false);
 
-        try {
-            for (NetworkInterface networkInterface : interfaces) {
-                findOnInterfaceFutures.add(supplyAsync(() -> 
findOnInterface(networkInterface), executor));
-            }
+        ExecutorService executor = 
Executors.newFixedThreadPool(sockets.size(), threadFactory);
 
-            for (CompletableFuture<Collection<NetworkAddress>> future : 
findOnInterfaceFutures) {
-                result.addAll(future.get(resultWaitMillis * REQ_ATTEMPTS * 2L, 
TimeUnit.MILLISECONDS));
+        try {
+            // Will contain nodes, found on all eligible interfaces and an 
unbound socket.
+            List<CompletableFuture<Collection<NetworkAddress>>> futures = 
sockets.stream()
+                    .map(socket -> supplyAsync(() -> findOnSocket(socket), 
executor))
+                    .collect(toList());
+
+            // Collect results. Futures shouldn't throw exceptions or hang.
+            for (CompletableFuture<Collection<NetworkAddress>> future : 
futures) {
+                result.addAll(future.join());

Review Comment:
   You removed timeout, why?
   
   `join()` is uninterruplible, please don't use it.



##########
modules/network/src/main/java/org/apache/ignite/internal/network/MulticastNodeFinder.java:
##########
@@ -107,82 +123,74 @@ public MulticastNodeFinder(
         this.resultWaitMillis = resultWaitMillis;
         this.ttl = ttl;
         this.localAddressToAdvertise = localAddressToAdvertise;
-        this.nodeName = nodeName;
-        this.listenerThreadPool = 
Executors.newSingleThreadExecutor(NamedThreadFactory.create(nodeName, 
"multicast-node-listener", LOG));
+
+        this.threadFactory = NamedThreadFactory.create(nodeName, 
"multicast-node-finder", LOG);

Review Comment:
   Should we really use the same thread-factory for both pools? Looks 
inconvenient to me, thread names will be confusing. Old name 
("multicast-node-listener") made more sense. Why did you change it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to