clebertsuconic commented on code in PR #5603: URL: https://github.com/apache/activemq-artemis/pull/5603#discussion_r2027733373
########## artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java: ########## @@ -593,6 +617,129 @@ public void onConnection(ClientSessionFactoryInternal sf) { // false, // localMember.getConnector().a, // localMember.getConnector().b); + + startTopologyScanner(); + } + + @Override + public void connectorsChanged(List<DiscoveryEntry> newConnectors) { + discoveryEntries = newConnectors; + + if (topology.getMembers().size() > 1) { + startTopologyScanner(); + } + } + + public TopologyScanner getTopologyScanner() { + return topologyScanner; + } + + private synchronized void startTopologyScanner() { + if (topologyScannerAttempts != 0 && !stopping) { + if (topologyScanner == null) { + topologyScanner = new TopologyScanner(scheduledExecutor, executor, retryInterval, TimeUnit.MILLISECONDS, true); + } + + if (!topologyScanner.isStarted()) { + topologyScanner.start(); + } + + topologyScannerCounter.set(0); + + topologyScanner.delay(); + } + } + + public final class TopologyScanner extends ActiveMQScheduledComponent { + private volatile boolean running = false; + + public boolean isRunning() { + return running; + } + + TopologyScanner(ScheduledExecutorService scheduledExecutorService, Executor executor, long checkPeriod, TimeUnit timeUnit, boolean onDemand) { + super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand); + } + + @Override + public boolean delay() { + running = true; + + return super.delay(); + } + + @Override + public void run() { + TransportConfiguration[] transportConfigurations = null; + + if (clusterConnector instanceof DiscoveryClusterConnector) { + List<DiscoveryEntry> discoveredEntries = discoveryEntries; + + if (discoveredEntries != null) { + transportConfigurations = discoveryEntries.stream() + .map(discoveryEntry -> discoveryEntry.getConnector()) + .toArray(TransportConfiguration[]::new); + } else { + logger.debug("No discovered entries"); + transportConfigurations = new TransportConfiguration[0]; + } + } else if (clusterConnector instanceof StaticClusterConnector) { + transportConfigurations = serverLocator.getStaticTransportConfigurations(); + } else { + throw new IllegalStateException("Cluster connector not supported"); + } + + boolean topologyUpdated = updateTopology(transportConfigurations); + + int topologyScannerCount = topologyScannerCounter.incrementAndGet(); + + boolean retry = (topologyScannerAttempts == -1 || topologyScannerCount < topologyScannerAttempts); + + if (!topologyUpdated && !stopping && retry) { + delay(); + } else { + running = false; + + if (!topologyUpdated && !stopping) { + ActiveMQServerLogger.LOGGER.incompleteClusterTopology(name.toString(), topology, topology.getMembers().toString()); + } + } + } + + private boolean updateTopology(TransportConfiguration[] transportConfigurations) { + boolean result = true; + + for (TransportConfiguration transportConfiguration : transportConfigurations) { + if (!topology.getMembers().stream().anyMatch(member -> connector.isSameTarget(transportConfiguration) || + member.getPrimary() != null && member.getPrimary().isSameTarget(transportConfiguration) || + member.getBackup() != null && member.getBackup().isSameTarget(transportConfiguration))) { + + try (ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology, true, transportConfiguration)) { + targetLocator.setReconnectAttempts(0); + targetLocator.setInitialConnectAttempts(0); + targetLocator.setConnectionTTL(connectionTTL); + targetLocator.setCallTimeout(serverLocator.getCallTimeout()); + targetLocator.setNodeID(serverLocator.getNodeID()); + targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration()); + targetLocator.setIdentity("(Cluster-topology-scanner::" + server.toString() + ")"); + + try { + try (ClientSessionFactoryInternal targetClientSessionFactory = targetLocator.connect()) { + targetClientSessionFactory.waitForTopology(serverLocator.getCallTimeout(), TimeUnit.MILLISECONDS); + } + } catch (ActiveMQException e) { Review Comment: I would log the outcome of this at least as a log.debug -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact