Thanks John, helpful as always, yes an async client will be nice.

Barry, this is awesome, yes this is the kind of example I was looking for.

Also I think a Async result Collector native support from GEODE will be
awesome and needed. I know its difficult but I think this should be
considered in future releases.

Regards

On Tue, Aug 15, 2017 at 6:56 AM, John Blum <[email protected]> wrote:

> Hi Amit-
>
> *Spring Data Geode* does not offer any additional help for streaming
> Function results OOTB, particularly since, as Udo says...
>
> "*The one caveat here is that you have to deal with failure and possible
> duplicates when the function is marked as HA and it might retry/restart
> upon detection of failure.*"
>
> Typically, handling these type of concerns appropriately varies greatly
> from UC to UC, given different application requirements and SLAs.
>
> However, as Udo mentioned, and as the Javadoc
> <http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html>
> [1] for o.a.g.cache.execute.ResultCollector states, specifically this...
>
> "*Results arrive as they are sent using
> the ResultSender.sendResult(Object)
> <http://gemfire-91-javadocs.docs.pivotal.io/org/apache/geode/cache/execute/ResultSender.html#sendResult-T->
>  and
> can be used as they arrive.*"
>
> ... it is possible to implement a "custom", stream handling,
> o.a.g.cache.execute.ResultCollector.
>
> For example, see here
> <https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java>
>  [2].
>
> In fact, I have written a few tests (e.g. streamingFunctionWithStr
> eamingResultCollector [3]) to illustrate *Function* result streaming and
> basic stream handling using SDG with a "custom" o.a.g.cache.execute.
> ResultCollector
> <https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L235-L257>
>  [4].
> The *Function* is implemented as a simple POJO method
> <https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L294-L318>
>  [5]
> using SDG's @GemfireFunction annotation and Geode's o.a.g.cache.execute.
> ResultSender.
>
>
> There are few nuances with Geode's API that users should be aware of...
>
>
> 1. First of all, and most importantly, all o.a.g.cache.execute.
> Execution.execute(..) methods [6] block!
>
> This is unfortunate.  I see no obvious reason why execute(..) methods
> need to block, particularly since o.a.g.cache.execute.ResultCollector
> implementations could be made to block if/when necessary.
>
> Minimally, it would have been convenient if the Geode Execution API [6]
> provided 1 or more executeAsync(..) methods.
>
>
> 2. It is actually not apparent from the ResultCollector API that a developer
> need implement addResult(..) in order to process intermediate results as
> they arrive.
>
> addResult(..) seems more like an internal API Geode uses to "collect"
> results on the receiver's end, especially since the DistributedMember
> parameter does not technically serve much purpose.
>
> As such, a developer might expect that s/he can call Execution.execute(..) 
> followed
> immediately by using the ResultCollector.getResult() method to process
> results until the result set (or stream) is exhausted (e.g.
> Iterable/Iterator style).  After all, the Javadoc for the
> ResultCollector.getResult() method states...
>
> "*It returns the result of function execution, potentially blocking
> until all the results are available
> <http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html#endResults-->
>  has
> been called.*"
>
> "*Potentially*" is not absolute, yet, it is (!) since
> Execution.execute(..) blocks without a developer wrapping such a call
> with a threaded java.util.concurrent.Executor.  The only real indication
> that ResultCollector.getResult(..) is only reachable upon completion
> is... "*returns the result of function execution*", which implies that
> the ResultCollector.getResult() method will not return until the
> *Function* is complete, which is true, and also is not until the
> Execution.execute(..) method returns, providing the same Thread calls
> Execution.execute(..) along with ResultCollector.getResult().
>
> I would also add that a quick review of *Geode's User Guide* on *Function
> Execution
> <http://geode.apache.org/docs/guide/12/developing/function_exec/chapter_overview.html>*
>  [10]
> is less than clear on this matter as well.
>
> This is certainly an area SDG can provide added value given the core *Spring
> Framework's* Async features and Reactive support; I will consider this; See
> DATAGEODE-36 <https://jira.spring.io/browse/DATAGEODE-36> - "*Add Async,
> Reactive and Streaming support to the Function Annotation support.*
> <https://jira.spring.io/browse/DATAGEODE-36>" [7]
>
>
> 3. Finally, you are correct in that Geode's o.a.g.internal.cache.execute.
> DefaultResultCollector [8] is a rather naive implementation, even for
> small result sets actually.  It is especially not suited for streaming.
>
> By way of example, I offer up only a slightly better implementation here
> <https://github.com/jxblum/spring-gemfire-tests/blob/master/src/test/java/org/spring/data/gemfire/cache/PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L183-L233>
>  [9].
> Of course, it is subject to the same constraints described in #1 & #2
> above, unless you are multi-threading the execution and result handling.
> Note: [9] is Thread-safe.
>
>
> Anyway, hope this helps and gives you some guidance for your own
> implementation, which I am certain will be quite a bit more involved.
>
> Regards,
> John
>
> [1] 
> *http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html
> <http://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/execute/ResultCollector.html>*
> [2] https://github.com/jxblum/spring-gemfire-tests/blob/
> master/src/test/java/org/spring/data/gemfire/cache/
> PeerCacheFunctionExecutionResultStreamingIntegrationTests.java
> [3] https://github.com/jxblum/spring-gemfire-tests/blob/
> master/src/test/java/org/spring/data/gemfire/cache/
> PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L113-L130
> [4] https://github.com/jxblum/spring-gemfire-tests/blob/
> master/src/test/java/org/spring/data/gemfire/cache/
> PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L235-L257
> [5] https://github.com/jxblum/spring-gemfire-tests/blob/
> master/src/test/java/org/spring/data/gemfire/cache/
> PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L294-L318
> [6] http://geode.apache.org/releases/latest/javadoc/org/
> apache/geode/cache/execute/Execution.html
> [7] https://jira.spring.io/browse/DATAGEODE-36
> [8] https://github.com/apache/geode/blob/rel/v1.2.0/geode-
> core/src/main/java/org/apache/geode/internal/cache/execute/
> DefaultResultCollector.java
> [9] https://github.com/jxblum/spring-gemfire-tests/blob/
> master/src/test/java/org/spring/data/gemfire/cache/
> PeerCacheFunctionExecutionResultStreamingIntegrationTests.java#L183-L233
> [10] http://geode.apache.org/docs/guide/12/developing/
> function_exec/chapter_overview.html
>
>
> On Mon, Aug 14, 2017 at 9:14 AM, Amit Pandey <[email protected]>
> wrote:
>
>> Hey Udo,
>>
>> I can do that. However I was using @GemfireFunction and didn't understand
>> how I can do it. Anyways I understand how to do it from the server.
>> Is there any example code for the Streaming result collector?
>>
>> John,
>>
>> Does Spring Data Geode have any helpers for this ?
>>
>> Regards
>>
>> On Mon, Aug 14, 2017 at 9:40 PM, Udo Kohlmeyer <[email protected]> wrote:
>>
>>> Hi there Amit.
>>>
>>> Have you looked at the ResultSender.sendResult() method on the function?
>>> You can use sendResult() as often as you like to send chunks of 1000
>>> results. You just have to ensure that you "close" the resultSender by
>>> calling sendLast().
>>>
>>> As for the streaming result collector... Geode does not have a streaming
>>> interface, but you can implement a custom result collector. In this custom
>>> result collector you can embed your processing of chunks in the
>>> "addResult". This way you can process data as soon as the collector
>>> receives it.
>>>
>>> The one caveat here is that you have to deal with failure and possible
>>> duplicates when the function is marked as HA and it might retry/restart
>>> upon detection of failure.
>>>
>>> --Udo
>>>
>>> On 8/14/17 00:14, Amit Pandey wrote:
>>>
>>> Also in Spring Data Geode is it possible to send data as soon as I have
>>> a chunk of say 1000/ I know I can specify batch size but I don't see how I
>>> can do it like streaming
>>>
>>> On Sun, Aug 13, 2017 at 3:08 PM, Amit Pandey <[email protected]>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I have a function which can potentially return a very large data sets.
>>>>
>>>> I want to stream data via the functions. Now the default result
>>>> collector of  Geode collects all the data in one large chunk, This might
>>>> result in very slow operation times. How can I use a streaming result
>>>> collector? Is there any example of it given?
>>>>
>>>> I am using spring-data-geode so if there is something available there
>>>> that will be great too.
>>>>
>>>> Regards
>>>>
>>>
>>>
>>>
>>
>
>
> --
> -John
> john.blum10101 (skype)
>

Reply via email to