ank19 commented on a change in pull request #6526:
URL: https://github.com/apache/camel/pull/6526#discussion_r771970676



##########
File path: 
components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
##########
@@ -368,6 +379,197 @@ public void dispose() {
                 return this.client.readValues(0, TimestampsToReturn.Server, 
nodeIds);
             });
         }
+
+        private BrowseResult filter(final BrowseResult browseResult, final 
Pattern pattern) {
+
+            final ReferenceDescription[] references = 
browseResult.getReferences();
+
+            if (null == references || null == pattern) {
+                return browseResult;
+            }
+
+            final List<ReferenceDescription> filteredReferences = new 
ArrayList<>();
+            for (final ReferenceDescription reference : references) {
+                final String id = reference.getNodeId().toParseableString();
+                if (!(pattern.matcher(id).matches())) {
+                    LOG.trace("Node {} excluded by filter", id);
+                    continue;
+                }
+                filteredReferences.add(reference);
+            }
+
+            return new BrowseResult(
+                    browseResult.getStatusCode(), 
browseResult.getContinuationPoint(),
+                    filteredReferences.toArray(new ReferenceDescription[0]));
+        }
+
+        private CompletableFuture<Map<ExpandedNodeId, BrowseResult>> flatten(
+                List<CompletableFuture<Map<ExpandedNodeId, BrowseResult>>> 
browseResults) {
+            return CompletableFuture.allOf(browseResults.toArray(new 
CompletableFuture[0]))
+                    .thenApply(__ -> browseResults
+                            .stream()
+                            .map(CompletableFuture::join)
+                            .map(Map::entrySet)
+                            .flatMap(Set::stream)
+                            .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue, (e1, e2) -> e2)));
+        }
+
+        // Browse at continuation point if any
+        public CompletableFuture<BrowseResult> browse(BrowseResult 
previousBrowseResult) {
+
+            final ByteString continuationPoint = 
previousBrowseResult.getContinuationPoint();
+
+            if (previousBrowseResult.getStatusCode().isGood() && 
continuationPoint.isNotNull()) {
+
+                return this.client.browseNext(false, continuationPoint)
+
+                        .thenCompose(browseResult -> {
+
+                            final ReferenceDescription[] previousReferences = 
previousBrowseResult.getReferences();
+                            final ReferenceDescription[] references = 
browseResult.getReferences();
+
+                            if (null == references) {
+
+                                LOG.info("Browse continuation point -> no 
references");
+                                return completedFuture(previousBrowseResult);
+                            } else if (null == previousReferences) {
+
+                                LOG.info("Browse continuation point -> 
previous references not obtained");
+                                return completedFuture(browseResult);
+                            }
+
+                            final ReferenceDescription[] combined
+                                    = Arrays.copyOf(previousReferences, 
previousReferences.length + references.length);
+                            System.arraycopy(references, 0, combined, 
previousReferences.length, references.length);
+
+                            LOG.debug("Browse continuation point -> {}: {} 
reference(s); total: {} reference(s)",
+                                    browseResult.getStatusCode(), 
references.length, combined.length);
+
+                            return browse(new BrowseResult(
+                                    browseResult.getStatusCode(), 
browseResult.getContinuationPoint(), combined));
+                        });
+
+            } else {
+
+                return completedFuture(previousBrowseResult);
+            }
+        }
+
+        // Browse a single node, retrieve additional results, filter node ids 
and eventually browse deeper into the tree
+        public CompletableFuture<Map<ExpandedNodeId, BrowseResult>> browse(
+                BrowseDescription browseDescription, BrowseResult 
browseResult, int depth, int maxDepth, Pattern pattern,
+                int maxNodesPerRequest) {
+
+            return browse(browseResult)
+
+                    .thenCompose(preliminary -> 
completedFuture(filter(preliminary, pattern)))
+
+                    .thenCompose(filtered -> {
+
+                        final ExpandedNodeId expandedNodeId = 
browseDescription.getNodeId().expanded();
+                        final Map<ExpandedNodeId, BrowseResult> root = 
Collections.singletonMap(expandedNodeId, filtered);
+                        final CompletableFuture<Map<ExpandedNodeId, 
BrowseResult>> finalFuture = completedFuture(root);
+                        final ReferenceDescription[] references = 
filtered.getReferences();
+
+                        if (depth >= maxDepth || null == references) {
+                            return finalFuture;
+                        }
+
+                        final List<CompletableFuture<Map<ExpandedNodeId, 
BrowseResult>>> futures = new ArrayList<>();
+
+                        // Save current node
+                        futures.add(finalFuture);
+
+                        final List<ExpandedNodeId> nodeIds = 
Stream.of(references)
+                                
.map(ReferenceDescription::getNodeId).collect(Collectors.toList());
+
+                        final List<List<ExpandedNodeId>> lists = 
Lists.partition(nodeIds, maxNodesPerRequest);
+                        for (final List<ExpandedNodeId> list : lists) {
+                            futures.add(browse(list, 
browseDescription.getBrowseDirection(),
+                                    
browseDescription.getNodeClassMask().intValue(), depth + 1, maxDepth, pattern,
+                                    browseDescription.getIncludeSubtypes(), 
maxNodesPerRequest));
+                        }
+
+                        return flatten(futures);
+                    });
+        }
+
+        // Browse according to a list of browse descriptions
+        public CompletableFuture<Map<ExpandedNodeId, BrowseResult>> browse(
+                List<BrowseDescription> browseDescriptions,
+                int depth, int maxDepth, Pattern pattern, int 
maxNodesPerRequest) {
+
+            return this.client.browse(browseDescriptions)
+
+                    .thenCompose(partials -> {
+
+                        // Fail a bit more gracefully in case of missing 
results
+                        if (partials.size() != browseDescriptions.size()) {
+
+                            // @TODO: Replace with Java 9 functionality
+                            final CompletableFuture<Map<ExpandedNodeId, 
BrowseResult>> failedFuture = new CompletableFuture<>();
+                            failedFuture.completeExceptionally(new 
IllegalArgumentException(
+                                    format(
+                                            "Invalid number of browse results: 
%s, expected %s", partials.size(),
+                                            browseDescriptions.size())));
+                            return failedFuture;
+                        }
+
+                        final List<CompletableFuture<Map<ExpandedNodeId, 
BrowseResult>>> futures = new ArrayList<>();
+
+                        for (int i = 0; i < partials.size(); i++) {
+
+                            futures.add(browse(browseDescriptions.get(i), 
partials.get(i), depth, maxDepth, pattern,
+                                    maxNodesPerRequest));
+                        }
+
+                        return flatten(futures);
+                    });
+        }
+
+        // Wrapper for looking up nodes and instantiating initial browse 
descriptions according to the configuration provided
+        @SuppressWarnings("unchecked")
+        public CompletableFuture<Map<ExpandedNodeId, BrowseResult>> browse(
+                List<ExpandedNodeId> expandedNodeIds, BrowseDirection 
direction, int nodeClasses, int depth, int maxDepth,
+                Pattern pattern, boolean includeSubTypes, int 
maxNodesPerRequest) {
+
+            final CompletableFuture<NodeId>[] futures = 
expandedNodeIds.stream().map(this::lookupNamespace)
+                    .toArray(CompletableFuture[]::new);
+
+            return CompletableFuture.allOf(futures)
+
+                    .thenCompose(__ -> {
+
+                        List<NodeId> nodeIds = 
Stream.of(futures).map(CompletableFuture::join)
+                                .collect(Collectors.toList());
+
+                        return completedFuture(nodeIds.stream().map(nodeId -> 
new BrowseDescription(
+                                nodeId, direction, Identifiers.References, 
includeSubTypes, uint(nodeClasses),
+                                
uint(BrowseResultMask.All.getValue()))).collect(Collectors.toList()));
+                    })
+
+                    .thenCompose(descriptions -> browse(descriptions, depth, 
maxDepth, pattern, maxNodesPerRequest))
+
+                    .whenComplete((actual, error) -> {
+
+                        if (!LOG.isErrorEnabled()) {
+
+                            return;
+                        }
+
+                        final String expandedNodeIdsString = 
expandedNodeIds.stream()
+                                .map(ExpandedNodeId::toParseableString)
+                                .collect(Collectors.joining(", "));
+
+                        if (actual != null) {
+                            LOG.debug("Browse node(s) {} -> {} result(s)", 
expandedNodeIdsString, actual.size());
+
+                        } else {
+                            LOG.error("Browse node(s) {} -> failed: {}", 
expandedNodeIdsString, error);

Review comment:
       Yes, makes sense, of course. I changed that - moved it to the producer, 
that reduces the complexity in the SubscriptionManager class, too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to