ank19 commented on a change in pull request #6526: URL: https://github.com/apache/camel/pull/6526#discussion_r771970676
########## File path: components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java ########## @@ -368,6 +379,197 @@ public void dispose() { return this.client.readValues(0, TimestampsToReturn.Server, nodeIds); }); } + + private BrowseResult filter(final BrowseResult browseResult, final Pattern pattern) { + + final ReferenceDescription[] references = browseResult.getReferences(); + + if (null == references || null == pattern) { + return browseResult; + } + + final List<ReferenceDescription> filteredReferences = new ArrayList<>(); + for (final ReferenceDescription reference : references) { + final String id = reference.getNodeId().toParseableString(); + if (!(pattern.matcher(id).matches())) { + LOG.trace("Node {} excluded by filter", id); + continue; + } + filteredReferences.add(reference); + } + + return new BrowseResult( + browseResult.getStatusCode(), browseResult.getContinuationPoint(), + filteredReferences.toArray(new ReferenceDescription[0])); + } + + private CompletableFuture<Map<ExpandedNodeId, BrowseResult>> flatten( + List<CompletableFuture<Map<ExpandedNodeId, BrowseResult>>> browseResults) { + return CompletableFuture.allOf(browseResults.toArray(new CompletableFuture[0])) + .thenApply(__ -> browseResults + .stream() + .map(CompletableFuture::join) + .map(Map::entrySet) + .flatMap(Set::stream) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2))); + } + + // Browse at continuation point if any + public CompletableFuture<BrowseResult> browse(BrowseResult previousBrowseResult) { + + final ByteString continuationPoint = previousBrowseResult.getContinuationPoint(); + + if (previousBrowseResult.getStatusCode().isGood() && continuationPoint.isNotNull()) { + + return this.client.browseNext(false, continuationPoint) + + .thenCompose(browseResult -> { + + final ReferenceDescription[] previousReferences = previousBrowseResult.getReferences(); + final ReferenceDescription[] references = browseResult.getReferences(); + + if (null == references) { + + LOG.info("Browse continuation point -> no references"); + return completedFuture(previousBrowseResult); + } else if (null == previousReferences) { + + LOG.info("Browse continuation point -> previous references not obtained"); + return completedFuture(browseResult); + } + + final ReferenceDescription[] combined + = Arrays.copyOf(previousReferences, previousReferences.length + references.length); + System.arraycopy(references, 0, combined, previousReferences.length, references.length); + + LOG.debug("Browse continuation point -> {}: {} reference(s); total: {} reference(s)", + browseResult.getStatusCode(), references.length, combined.length); + + return browse(new BrowseResult( + browseResult.getStatusCode(), browseResult.getContinuationPoint(), combined)); + }); + + } else { + + return completedFuture(previousBrowseResult); + } + } + + // Browse a single node, retrieve additional results, filter node ids and eventually browse deeper into the tree + public CompletableFuture<Map<ExpandedNodeId, BrowseResult>> browse( + BrowseDescription browseDescription, BrowseResult browseResult, int depth, int maxDepth, Pattern pattern, + int maxNodesPerRequest) { + + return browse(browseResult) + + .thenCompose(preliminary -> completedFuture(filter(preliminary, pattern))) + + .thenCompose(filtered -> { + + final ExpandedNodeId expandedNodeId = browseDescription.getNodeId().expanded(); + final Map<ExpandedNodeId, BrowseResult> root = Collections.singletonMap(expandedNodeId, filtered); + final CompletableFuture<Map<ExpandedNodeId, BrowseResult>> finalFuture = completedFuture(root); + final ReferenceDescription[] references = filtered.getReferences(); + + if (depth >= maxDepth || null == references) { + return finalFuture; + } + + final List<CompletableFuture<Map<ExpandedNodeId, BrowseResult>>> futures = new ArrayList<>(); + + // Save current node + futures.add(finalFuture); + + final List<ExpandedNodeId> nodeIds = Stream.of(references) + .map(ReferenceDescription::getNodeId).collect(Collectors.toList()); + + final List<List<ExpandedNodeId>> lists = Lists.partition(nodeIds, maxNodesPerRequest); + for (final List<ExpandedNodeId> list : lists) { + futures.add(browse(list, browseDescription.getBrowseDirection(), + browseDescription.getNodeClassMask().intValue(), depth + 1, maxDepth, pattern, + browseDescription.getIncludeSubtypes(), maxNodesPerRequest)); + } + + return flatten(futures); + }); + } + + // Browse according to a list of browse descriptions + public CompletableFuture<Map<ExpandedNodeId, BrowseResult>> browse( + List<BrowseDescription> browseDescriptions, + int depth, int maxDepth, Pattern pattern, int maxNodesPerRequest) { + + return this.client.browse(browseDescriptions) + + .thenCompose(partials -> { + + // Fail a bit more gracefully in case of missing results + if (partials.size() != browseDescriptions.size()) { + + // @TODO: Replace with Java 9 functionality + final CompletableFuture<Map<ExpandedNodeId, BrowseResult>> failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(new IllegalArgumentException( + format( + "Invalid number of browse results: %s, expected %s", partials.size(), + browseDescriptions.size()))); + return failedFuture; + } + + final List<CompletableFuture<Map<ExpandedNodeId, BrowseResult>>> futures = new ArrayList<>(); + + for (int i = 0; i < partials.size(); i++) { + + futures.add(browse(browseDescriptions.get(i), partials.get(i), depth, maxDepth, pattern, + maxNodesPerRequest)); + } + + return flatten(futures); + }); + } + + // Wrapper for looking up nodes and instantiating initial browse descriptions according to the configuration provided + @SuppressWarnings("unchecked") + public CompletableFuture<Map<ExpandedNodeId, BrowseResult>> browse( + List<ExpandedNodeId> expandedNodeIds, BrowseDirection direction, int nodeClasses, int depth, int maxDepth, + Pattern pattern, boolean includeSubTypes, int maxNodesPerRequest) { + + final CompletableFuture<NodeId>[] futures = expandedNodeIds.stream().map(this::lookupNamespace) + .toArray(CompletableFuture[]::new); + + return CompletableFuture.allOf(futures) + + .thenCompose(__ -> { + + List<NodeId> nodeIds = Stream.of(futures).map(CompletableFuture::join) + .collect(Collectors.toList()); + + return completedFuture(nodeIds.stream().map(nodeId -> new BrowseDescription( + nodeId, direction, Identifiers.References, includeSubTypes, uint(nodeClasses), + uint(BrowseResultMask.All.getValue()))).collect(Collectors.toList())); + }) + + .thenCompose(descriptions -> browse(descriptions, depth, maxDepth, pattern, maxNodesPerRequest)) + + .whenComplete((actual, error) -> { + + if (!LOG.isErrorEnabled()) { + + return; + } + + final String expandedNodeIdsString = expandedNodeIds.stream() + .map(ExpandedNodeId::toParseableString) + .collect(Collectors.joining(", ")); + + if (actual != null) { + LOG.debug("Browse node(s) {} -> {} result(s)", expandedNodeIdsString, actual.size()); + + } else { + LOG.error("Browse node(s) {} -> failed: {}", expandedNodeIdsString, error); Review comment: Yes, makes sense, of course. I changed that - moved it to the producer, that reduces the complexity in the SubscriptionManager class, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org