Thanks John, helpful as always, yes an async client will be nice. Barry, this is awesome, yes this is the kind of example I was looking for.
Also I think a Async result Collector native support from GEODE will be awesome and needed. I know its difficult but I think this should be considered in future releases. Regards On Tue, Aug 15, 2017 at 6:56 AM, John Blum <[email protected]> wrote: > Hi Amit- > > *Spring Data Geode* does not offer any additional help for streaming > Function results OOTB, particularly since, as Udo says... > > "*The one caveat here is that you have to deal with failure and possible > duplicates when the function is marked as HA and it might retry/restart > upon detection of failure.*" > > Typically, handling these type of concerns appropriately varies greatly > from UC to UC, given different application requirements and SLAs. > > However, as Udo mentioned, and as the Javadoc > <http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html> > [1] for o.a.g.cache.execute.ResultCollector states, specifically this... > > "*Results arrive as they are sent using > the ResultSender.sendResult(Object) > <http://gemfire-91-javadocs.docs.pivotal.io/org/apache/geode/cache/execute/ResultSender.html#sendResult-T-> > and > can be used as they arrive.*" > > ... it is possible to implement a "custom", stream handling, > o.a.g.cache.execute.ResultCollector. > > For example, see here > <https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java> > [2]. > > In fact, I have written a few tests (e.g. streamingFunctionWithStr > eamingResultCollector [3]) to illustrate *Function* result streaming and > basic stream handling using SDG with a "custom" o.a.g.cache.execute. > ResultCollector > <https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L235-L257> > [4]. > The *Function* is implemented as a simple POJO method > <https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L294-L318> > [5] > using SDG's @GemfireFunction annotation and Geode's o.a.g.cache.execute. > ResultSender. > > > There are few nuances with Geode's API that users should be aware of... > > > 1. First of all, and most importantly, all o.a.g.cache.execute. > Execution.execute(..) methods [6] block! > > This is unfortunate. I see no obvious reason why execute(..) methods > need to block, particularly since o.a.g.cache.execute.ResultCollector > implementations could be made to block if/when necessary. > > Minimally, it would have been convenient if the Geode Execution API [6] > provided 1 or more executeAsync(..) methods. > > > 2. It is actually not apparent from the ResultCollector API that a developer > need implement addResult(..) in order to process intermediate results as > they arrive. > > addResult(..) seems more like an internal API Geode uses to "collect" > results on the receiver's end, especially since the DistributedMember > parameter does not technically serve much purpose. > > As such, a developer might expect that s/he can call Execution.execute(..) > followed > immediately by using the ResultCollector.getResult() method to process > results until the result set (or stream) is exhausted (e.g. > Iterable/Iterator style). After all, the Javadoc for the > ResultCollector.getResult() method states... > > "*It returns the result of function execution, potentially blocking > until all the results are available > <http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html#endResults--> > has > been called.*" > > "*Potentially*" is not absolute, yet, it is (!) since > Execution.execute(..) blocks without a developer wrapping such a call > with a threaded java.util.concurrent.Executor. The only real indication > that ResultCollector.getResult(..) is only reachable upon completion > is... "*returns the result of function execution*", which implies that > the ResultCollector.getResult() method will not return until the > *Function* is complete, which is true, and also is not until the > Execution.execute(..) method returns, providing the same Thread calls > Execution.execute(..) along with ResultCollector.getResult(). > > I would also add that a quick review of *Geode's User Guide* on *Function > Execution > <http://geode.apache.org/docs/guide/12/developing/function_exec/chapter_overview.html>* > [10] > is less than clear on this matter as well. > > This is certainly an area SDG can provide added value given the core *Spring > Framework's* Async features and Reactive support; I will consider this; See > DATAGEODE-36 <https://jira.spring.io/browse/DATAGEODE-36> - "*Add Async, > Reactive and Streaming support to the Function Annotation support.* > <https://jira.spring.io/browse/DATAGEODE-36>" [7] > > > 3. Finally, you are correct in that Geode's o.a.g.internal.cache.execute. > DefaultResultCollector [8] is a rather naive implementation, even for > small result sets actually. It is especially not suited for streaming. > > By way of example, I offer up only a slightly better implementation here > <https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L183-L233> > [9]. > Of course, it is subject to the same constraints described in #1 & #2 > above, unless you are multi-threading the execution and result handling. > Note: [9] is Thread-safe. > > > Anyway, hope this helps and gives you some guidance for your own > implementation, which I am certain will be quite a bit more involved. > > Regards, > John > > [1] > *http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html > <http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html>* > [2] https://github.com/jxblum/spring-gemfire-tests/blob/ > master/src/test/java/org/spring/data/gemfire/cache/ > PeerCacheFunctionExecutionResultStreamingIntegrationTests.java > [3] https://github.com/jxblum/spring-gemfire-tests/blob/ > master/src/test/java/org/spring/data/gemfire/cache/ > PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L113-L130 > [4] https://github.com/jxblum/spring-gemfire-tests/blob/ > master/src/test/java/org/spring/data/gemfire/cache/ > PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L235-L257 > [5] https://github.com/jxblum/spring-gemfire-tests/blob/ > master/src/test/java/org/spring/data/gemfire/cache/ > PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L294-L318 > [6] http://geode.apache.org/releases/latest/javadoc/org/ > apache/geode/cache/execute/Execution.html > [7] https://jira.spring.io/browse/DATAGEODE-36 > [8] https://github.com/apache/geode/blob/rel/v1.2.0/geode- > core/src/main/java/org/apache/geode/internal/cache/execute/ > DefaultResultCollector.java > [9] https://github.com/jxblum/spring-gemfire-tests/blob/ > master/src/test/java/org/spring/data/gemfire/cache/ > PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L183-L233 > [10] http://geode.apache.org/docs/guide/12/developing/ > function_exec/chapter_overview.html > > > On Mon, Aug 14, 2017 at 9:14 AM, Amit Pandey <[email protected]> > wrote: > >> Hey Udo, >> >> I can do that. However I was using @GemfireFunction and didn't understand >> how I can do it. Anyways I understand how to do it from the server. >> Is there any example code for the Streaming result collector? >> >> John, >> >> Does Spring Data Geode have any helpers for this ? >> >> Regards >> >> On Mon, Aug 14, 2017 at 9:40 PM, Udo Kohlmeyer <[email protected]> wrote: >> >>> Hi there Amit. >>> >>> Have you looked at the ResultSender.sendResult() method on the function? >>> You can use sendResult() as often as you like to send chunks of 1000 >>> results. You just have to ensure that you "close" the resultSender by >>> calling sendLast(). >>> >>> As for the streaming result collector... Geode does not have a streaming >>> interface, but you can implement a custom result collector. In this custom >>> result collector you can embed your processing of chunks in the >>> "addResult". This way you can process data as soon as the collector >>> receives it. >>> >>> The one caveat here is that you have to deal with failure and possible >>> duplicates when the function is marked as HA and it might retry/restart >>> upon detection of failure. >>> >>> --Udo >>> >>> On 8/14/17 00:14, Amit Pandey wrote: >>> >>> Also in Spring Data Geode is it possible to send data as soon as I have >>> a chunk of say 1000/ I know I can specify batch size but I don't see how I >>> can do it like streaming >>> >>> On Sun, Aug 13, 2017 at 3:08 PM, Amit Pandey <[email protected]> >>> wrote: >>> >>>> Hi All, >>>> >>>> I have a function which can potentially return a very large data sets. >>>> >>>> I want to stream data via the functions. Now the default result >>>> collector of Geode collects all the data in one large chunk, This might >>>> result in very slow operation times. How can I use a streaming result >>>> collector? Is there any example of it given? >>>> >>>> I am using spring-data-geode so if there is something available there >>>> that will be great too. >>>> >>>> Regards >>>> >>> >>> >>> >> > > > -- > -John > john.blum10101 (skype) >
