On Thu, Jul 9, 2015 at 3:49 PM, William Burns <mudokon...@gmail.com> wrote: > > > On Thu, Jul 9, 2015 at 5:11 AM Dan Berindei <dan.berin...@gmail.com> wrote: >> >> Hi Will >> >> After the discussion we started in Galder's PR's comments [1], I >> started thinking that we should really have a stream() method directly >> in the Cache/AdvancedCache interface. >> >> I feel entrySet(), keySet() or values() encourage users to use >> external iteration with a for-each loop or iterator(), and we should >> encourage users to use the Stream methods instead. I also really don't >> like the idea of users having to close their iterators explicitly, and >> lazy Iterators (or Spliterators) need to be properly closed or they >> will leak resources. > > > The iterator and spliterator are automatically closed if it was fully > iterated upon. > > I don't think pulling all entries from the cluster (and loader) in for > entrySet, keySet or values is a good idea. Unless you are suggesting that > we only pull local entries only? In which case we have reverted these > changes back to ISPN 6 and older. > > The entrySet, keySet and values as of ISPN 7.0 are actually completely > backing collections and methods are evaluated per invocation. This means > any updates to them or the cache it was created from are seen by each other. >
And the iterators are already AutoCloseable, I know. But with the streams API we can hide resource management from the user, so I was hoping we could avoid using AutoCloseable altogether. >> >> >> My suggestion, then, is to make entrySet().iterator() and >> entrySet().spliterator() eager, so that they don't need to implement >> AutoCloseable. I would even go as far as to say that entrySet() should >> be eager itself, but maybe keySet().stream() would make a better API >> than adding a new keysStream() method. > > > Just so I understand you are more saying that we leave the entrySet, keySet > and values the way they are so they are backing collections, however > invocation of the iterator or spliterator would pull in all entries from the > entire cache into memory at once? It seems throwing Yes > UnsupportedOperationException with a message stating to use > stream().iterator() and closing the stream would be better imo (however that > would preclude the usage of foreach). Note the foreach loop is only an > issue when iterating over that collection and you break out of the loop > early. > > try (Stream<Map.Entry<K, V> stream = entrySet.stream()) { > Iterator<Map.Entry<K, V>> iterator = stream.iterator(); > } > > Actually I think the issue here is that our CacheCollections don't currently > implement CloseableIterable like the EntryIterable does. In that case you > can do a simple foreach loop with a break in a try with resource. We could > then document that close closes any iterators or spliterators that were > created from this instance of the collection. > > It is a little awkward, but could work this way. > > try (CacheSet<Map.Entry<K, V>> closeableEntrySet = entrySet) { > for (Map.Entry<K, V> entry : closeableEntrySet) { > } > } On the other hand, you wouldn't need any try-with-resources if you used the forEach method, because the resources are both acquired and released in the forEach call: entrySet.stream().forEach((k, v) -> { ... }) But if you make stream() or entrySet() return an AutoCloseable, then users will put a try-with-resources without it anyway, just in case. So I'd rather keep the iterator AutoCloseable than the stream/entry set. Making the EntryIterable implement AutoCloseable made sense in Java 7, because the only thing you could do with it was iterate on it, maybe with for-each. But in Java 8 Iterable also has a forEach method, and I wouldn't want users of forEach() to think about whether they need a try block or not. > >> >> >> >> Now to your questions: >> >> 1) >> forEach() doesn't allow the consumer to modify the entries, so I think >> the most common use case would be doing something with a captured >> variable (or System.out). > > > This is actually something I didn't cover in the document. But upon > thinking about this more I was thinking we probably want to allow for CDI > Injection of the cache for the consumer action before firing. In this way > the user can change values as they want. This would behave almost > identically to map/reduce, however it is much more understandable imo. > You mean like map/reduce when it is configured to store its results in an intermediate cache? I think you'd also need a CacheStream.reduceOnOwner() operation to put in the pipeline before forEach(), but yeah, it sounds like it should work. >> >> So I would make forEach execute the consumer >> on the originator, and maybe add a distributedForEach method that >> executes its consumer on each owner (accepting that the consumer may >> be executed twice for some keys, or never, if the originator crashes). >> distributedForEach probably makes more sense once you start injecting >> the Cache (or other components) in the remote Consumers. > > > This was my conundrum before, however I believe I found a happy medium. I > figured if we implement it distributed gives more flexibility. The user can > still choose to run it locally as they desire. > > For example you can call *.stream().iterator().forEachRemaining(consumer) if > you wanted to do a forEach locally in a single thread. And if you wanted it > parallelized you can do > StreamSupport.stream(*.stream().spliterator(), true).forEach(consumer) Except now you have to access the spliterator directly... Will this work, or will the user need try-with-resources? Do we want users to think about it? StreamSupport.stream(*.stream().spliterator(), true).limit(10).forEach(consumer) > > This would all be documented on the forEach method. > >> >> >> peek()'s intended use case is probably logging progress, so it will >> definitely need to interact with an external component. However, >> executing it to the originator would potentially change the execution >> of the stream dramatically, and adding logging shouldn't have that >> kind of impact. So peek() should be executed on the remote nodes, even >> if we don't have remote injection yet. > > > This is how I ended up doing it was to have it done remotely. > >> >> >> 2) >> I would say implement sorting on the originator from the beginning, >> and limit() and skip() as well. It's true that users may me >> disappointed to see adding limit() doesn't improve the performance of >> their sorted() execution, but I would rather have a complete API >> available for applications who don't need to sort the entire cache. > > > This is how I did this as well :) Basically if we find that there is a > sorted, distributed, limit or skip it performs all of the intermediate > operations up that point then uses an iterator to bring the results back > locally where it can be performed. Limit and distinct are also actually > performed remotely first to reduce how many results are returned. I am not > 100% sold on performing distinct remotely first as it could actually be > significantly slower, but it should hopefully reduce some memory usage :P > Shouldn't running distinct remotely actually require *more* memory, because now you have to keep track of unique results on each node? There are definitely scenarios where running it remotely will save a lot in network traffic, though. >> >> >> Cheers >> Dan >> >> >> [1] >> https://github.com/infinispan/infinispan/pull/3571#discussion-diff-34033399R22 >> >> On Wed, May 27, 2015 at 9:52 PM, William Burns <mudokon...@gmail.com> >> wrote: >> > Hello everyone, >> > >> > I wanted to let you know I wrote up a design documenting the successor >> > to >> > EntryRetriever, Distributed Streams [1] ! >> > >> > Any comments or feedback would be much appreciated. >> > >> > I especially would like targeted feedback regarding: >> > >> > 1. The operators forEach and peek may want to be ran locally. Should we >> > have an overridden method so users can pick which they want? Personally >> > I >> > feel that peek is too specific to matter and forEach can always be done >> > by >> > the caller locally if desired. >> > 2. The intermediate operators limit and skip do not seem worth >> > implementing >> > unless we have sorting support. (Sorting support will come later). I am >> > thinking to not support these until sorting is added. >> > >> > Thanks, >> > >> > - Will >> > >> > [1] >> > https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support >> > >> > _______________________________________________ >> > infinispan-dev mailing list >> > infinispan-dev@lists.jboss.org >> > https://lists.jboss.org/mailman/listinfo/infinispan-dev >> _______________________________________________ >> infinispan-dev mailing list >> infinispan-dev@lists.jboss.org >> https://lists.jboss.org/mailman/listinfo/infinispan-dev > > > _______________________________________________ > infinispan-dev mailing list > infinispan-dev@lists.jboss.org > https://lists.jboss.org/mailman/listinfo/infinispan-dev _______________________________________________ infinispan-dev mailing list infinispan-dev@lists.jboss.org https://lists.jboss.org/mailman/listinfo/infinispan-dev