Github user neykov commented on a diff in the pull request:
https://github.com/apache/brooklyn-server/pull/497#discussion_r97348289
--- Diff:
utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
---
@@ -69,86 +65,113 @@ public ReachableSocketFinder(Predicate<? super
HostAndPort> socketTester, Listen
}
/**
- *
+ * Returns the first element of sockets that is reachable.
* @param sockets The host-and-ports to test
* @param timeout Max time to try to connect to the ip:port
*
* @return The reachable ip:port
- * @throws NoSuchElementException If no ports accessible within the
given time
- * @throws NullPointerException If the sockets or duration is null
+ * @throws NoSuchElementException If no ports are accessible within
the given time
+ * @throws NullPointerException If sockets or timeout is null
* @throws IllegalStateException If the sockets to test is empty
*/
- public HostAndPort findOpenSocketOnNode(final Collection<? extends
HostAndPort> sockets, Duration timeout) {
+ public HostAndPort findOpenSocketOnNode(final Iterable<? extends
HostAndPort> sockets, Duration timeout) {
checkNotNull(sockets, "sockets");
- checkState(sockets.size() > 0, "No hostAndPort sockets supplied");
-
+ checkState(!Iterables.isEmpty(sockets), "No hostAndPort sockets
supplied");
+ checkNotNull(timeout, "timeout");
LOG.debug("blocking on any reachable socket in {} for {}",
sockets, timeout);
-
- final AtomicReference<HostAndPort> result = new
AtomicReference<HostAndPort>();
- boolean passed = Repeater.create("socket-reachable")
- .limitTimeTo(timeout)
- .backoffTo(Duration.FIVE_SECONDS)
- .until(new Callable<Boolean>() {
- @Override
- public Boolean call() {
- Optional<HostAndPort> reachableSocket =
tryReachable(sockets, Duration.FIVE_SECONDS);
- if (reachableSocket.isPresent()) {
- result.compareAndSet(null,
reachableSocket.get());
- return true;
- }
- return false;
- }})
- .run();
-
- if (passed) {
- LOG.debug("<< socket {} opened", result);
- assert result.get() != null;
- return result.get();
+ Iterator<HostAndPort> iter = findOpenSocketsOnNode(sockets,
timeout).iterator();
+ if (iter.hasNext()) {
+ return iter.next();
} else {
LOG.warn("No sockets in {} reachable after {}", sockets,
timeout);
throw new NoSuchElementException("could not connect to any
socket in " + sockets);
}
}
/**
- * Checks if any any of the given HostAndPorts are reachable. It
checks them all concurrently, and
- * returns the first that is reachable (or Optional.absent).
+ * Returns an iterable of the elements in sockets that are reachable.
The order of elements
+ * in the iterable corresponds to the order of the elements in the
input, not the order in which their
+ * reachability was determined. Iterators are unmodifiable and are
evaluated lazily.
+ *
+ * @param sockets The host-and-ports to test
+ * @param timeout Max time to try to connect to each ip:port
+ * @return An iterable containing all sockets that are reachable
according to {@link #socketTester}.
+ * @throws NullPointerException If sockets or timeout is null
+ * @throws IllegalStateException If the sockets to test is empty
*/
- private Optional<HostAndPort> tryReachable(Collection<? extends
HostAndPort> sockets, Duration timeout) {
- final AtomicReference<HostAndPort> reachableSocket = new
AtomicReference<HostAndPort>();
- final CountDownLatch latch = new CountDownLatch(1);
- List<ListenableFuture<?>> futures = Lists.newArrayList();
+ public Iterable<HostAndPort> findOpenSocketsOnNode(final Iterable<?
extends HostAndPort> sockets, Duration timeout) {
+ checkNotNull(sockets, "sockets");
+ checkState(!Iterables.isEmpty(sockets), "No hostAndPort sockets
supplied");
+ checkNotNull(timeout, "timeout");
+ return Optional.presentInstances(tryReachable(sockets, timeout));
+ }
+
+ /**
+ * @return A lazily computed Iterable containing present values for
the elements of sockets that are
+ * reachable according to {@link #socketTester} and absent values for
those not. Checks are concurrent
+ * and the elements in the Iterable are ordered according to their
position in sockets.
+ */
+ private Iterable<Optional<HostAndPort>> tryReachable(Iterable<?
extends HostAndPort> sockets, final Duration timeout) {
+ final List<ListenableFuture<Optional<HostAndPort>>> futures =
Lists.newArrayList();
+ final AtomicReference<Stopwatch> sinceFirstCompleted = new
AtomicReference<>();
+
for (final HostAndPort socket : sockets) {
- futures.add(userExecutor.submit(new Runnable() {
+ futures.add(userExecutor.submit(new
Callable<Optional<HostAndPort>>() {
+ @Override
+ public Optional<HostAndPort> call() {
+ // Whether the socket was reachable (vs. the result of
call, which is whether the repeater is done).
+ final AtomicBoolean theResultWeCareAbout = new
AtomicBoolean();
+ Repeater.create("socket-reachable")
+ .limitTimeTo(timeout)
+ .backoffTo(Duration.FIVE_SECONDS)
+ .until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws
TimeoutException {
+ boolean reachable =
socketTester.apply(socket);
+ if (reachable) {
+ theResultWeCareAbout.set(true);
+ return true;
+ } else {
+ // Run another check if nobody
else has completed yet or another task has
+ // completed but this one is still
in its grace period.
+ Stopwatch timerSinceFirst =
sinceFirstCompleted.get();
+ return timerSinceFirst != null &&
Duration.FIVE_SECONDS.subtract(Duration.of(timerSinceFirst)).isNegative();
+ }
+ }
+ })
+ .run();
+ if (theResultWeCareAbout.get()) {
+ sinceFirstCompleted.compareAndSet(null,
Stopwatch.createStarted());
+ }
+ return theResultWeCareAbout.get() ?
Optional.of(socket) : Optional.<HostAndPort>absent();
+ }
+ }));
+ }
+
+ return new Iterable<Optional<HostAndPort>>() {
+ @Override
+ public Iterator<Optional<HostAndPort>> iterator() {
+ return new AbstractIterator<Optional<HostAndPort>>() {
+ int count = 0;
+
@Override
- public void run() {
- try {
- if (socketTester.apply(socket)) {
- reachableSocket.compareAndSet(null,
socket);
- latch.countDown();
+ protected Optional<HostAndPort> computeNext() {
+ if (count < futures.size()) {
+ final Future<Optional<HostAndPort>> future =
futures.get(count++);
+ try {
+ return
future.get(timeout.toUnit(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
--- End diff --
Can we guarantee that all threads return after `timeout` expires? Could we
get in a state where each (or at least the first few calls of) `computeNext`
blocks?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---