Does frameworks like reactor not resolve this issue with back pressure
and other sexy tricks?

Cheers
Pieter

On Fri, 2022-07-29 at 13:51 +0100, Oleksandr Porunov wrote:
> I'm also not sure but for some reason I feel that we may need some
> event loop to be implemented if we want to re-write it to async
> capabilities.
> The reason I'm telling it is because I feel that in some situations
> we may trigger a very long call stack.
> I.e.:
> Promise -> Promise -> Promise -> .... 
> 
> I guess that could be in a situation when the next part of the
> execution depends on the previous part of the execution. For example,
> 
> g.V().has("hello", "world").barrier(50).limit(5)
> 
> So, let's assume the next things about the execution of the above
> query:
> - It is executed in JanusGraph with batch query enabled (graph
> provider specific, but it's easier for me to focus on a concrete
> implementation)
> - There is no necessary index for that property (again, graph
> provider specific)
> - There are only 4 vertices with such property
> - We have 50 million vertices in total
> 
> If the above facts are true then the query will be executed like the
> following:
> 1) get first (or next) 50 vertices
> 2) filter out unmatched vertices
> 3) if the limit is not reached 5 then process the next vertices by
> starting from step "1" again. Otherwise, return data.
>  
> So, in fact, with the above scenario we will traverse all 50 million
> vertices in the graph which will result in about 1 million chain
> calls for promises based implementation. That will probably result in
> StackOverflowException.
> With synchronous code we don't have these problems because we don't
> call a new function recursively each time we need to retrieve part of
> the data.
> We can overcome the above issue by implementing some kind of event
> loop where we put all the results and then a single thread running
> that loop will call necessary async functions.
> If so, we will never have a long chain of calls. The only thing in
> such architecture is that it will require all those functions to be
> non-blocking and thus all providers will probably need to implement
> their async functionality 
> (doesn't matter if it is real async or "fake" async behind a thread
> pool).
> 
> That's just something on top of my mind, but I could be wrong and
> could miss some points. So, maybe we don't need any event loop, I
> just didn't investigate it enough yet.
> 
> Oleksandr
> 
> 
> On Fri, Jul 29, 2022 at 1:09 PM Oleksandr Porunov
> <alexandr.poru...@gmail.com> wrote:
> > Hi Divij,
> > 
> > Thanks for joining the conversation!
> > 
> > Basically the `promise` step is for remote queries execution only
> > which is seen from the `promis` method "throw new
> > IllegalStateException("Only traversals created using withRemote()
> > can be used in an async way");".
> > Under the hood Gremlin Server will be doing sync execution still
> > even if the graph provider can support async query execution.
> > The Gremlin Server will need to allocate 200 threads if we are
> > trying to execute 200 parallel queries. 
> > g.V().has("name", "hello").out("world") - this query usually will
> > first try to find vertex id for `hello` name and block the thread
> > until it is processed. After that it will fire another query to the
> > storage backend to retrieve all adjacent vertices following the
> > "world" edge and block the executing thread until we receive a
> > response from the database. After that it will return data back to
> > the client. 
> > Instead of that I wanted to propose a different execution strategy.
> > Let's fire the first query and instead of waiting for the response
> > let's say when the response is available, please execute another
> > step - which is get all adjacent vertices following the "world"
> > edge. At that step we are doing the same, we are firing a database
> > query and instead of waiting for the response we say, when the
> > response is available, please execute another step (which is kind
> > of a final step, which simply returns data to the client by
> > executing a necessary function).
> > I guess something like this would help to not allocate unnecessary
> > threads which simply wait for the response from the underlying
> > storage backends.
> > 
> > Oleksandr
> > 
> > On Fri, Jul 29, 2022 at 8:34 AM Divij Vaidya
> > <divijvaidy...@gmail.com> wrote:
> > > Hey folks
> > > 
> > > Interesting discussion! 
> > > 
> > > I am a bit confused though since I believe we already have async
> > > execution implemented in TinkerPop Java client. Let me try to
> > > clarify and please let me know if I missed something.
> > > 
> > > Java client uses a small number of websocket connections to
> > > multiplex multiple queries to the server. You can think of it as
> > > a pipe established to the server on which we could send messages
> > > belonging to different queries. On the server, these messages are
> > > queued until one of the execution threads can pick it up. Once a
> > > request is picked for execution, the results are returned in a
> > > pipelines/streaming manner i.e. the server calculates a batch of
> > > results (size of batch is configurable per query), and sends the
> > > results as messages on the same WebSocket channel. On the client
> > > size, these results are stored in a queue until the application
> > > thread consumes them uses an iterator. This model of execution
> > > *does not block the application thread* and hence, provides async
> > > capabilities. 
> > > 
> > > A sample code to achieve this would be as follows:
> > > 
> > > ```
> > > final Cluster cluster = Cluster.build("localhost")
> > >                                               .port(8182)
> > >                                              
> > > .maxInProcessPerConnection(32)
> > >                                              
> > > .maxSimultaneousUsagePerConnection(32)
> > >                                              
> > > .serializer(Serializers.GRAPHBINARY_V1D0)
> > >                                               .create();
> > > 
> > > try {
> > >           final GraphTraversalSource g =
> > > traversal().withRemote(DriverRemoteConnection.using(cluster));
> > >           CompletableFuture<List<Object>> result =
> > > g.V().has("name",
> > > "pumba").out("friendOf").id().promise(Traversal::toList);
> > > 
> > >           // do some application layer stuff
> > >           // ...
> > >           // ...
> > >           // ...
> > > 
> > >           List<Object> verticesWithNamePumba = result.join();
> > >           System.out.println(verticesWithNamePumba);
> > > } finally {
> > >           cluster.close();
> > > }
> > > ```
> > > 
> > > Note that, in the above example, the thread executing the above
> > > code is not blocked until we call "result.join()".
> > > 
> > > Does this address the use that Oleksandr brought up at the
> > > beginning of this thread?
> > > 
> > > --
> > > Divij Vaidya
> > > 
> > > 
> > > 
> > > On Fri, Jul 29, 2022 at 4:05 AM Oleksandr Porunov
> > > <alexandr.poru...@gmail.com> wrote:
> > > > Hmm, that's interesting! Thank you Joshua for the idea!
> > > > So, I guess the general idea here could be:
> > > > we can start small and start implementing async functionality
> > > > for some
> > > > parts instead of implement async functionality for everything
> > > > straightaway.
> > > > 
> > > > Oleksandr
> > > > 
> > > > On Fri, Jul 29, 2022, 00:38 Joshua Shinavier
> > > > <j...@fortytwo.net> wrote:
> > > > 
> > > > > Well, the wrapper I mentioned before did not require a full
> > > > rewrite of
> > > > > TinkerPop :-) Rather, it provided async interfaces for
> > > > vertices and edges,
> > > > > on which operations like subgraph and shortest paths queries
> > > > were evaluated
> > > > > in an asynchronous fashion (using a special language, as it
> > > > happened, but
> > > > > limited Gremlin queries would have been an option). So I
> > > > think a basic
> > > > > async API might be a useful starting point even if it doesn't
> > > > go very deep.
> > > > >
> > > > > Josh
> > > > >
> > > > >
> > > > > On Thu, Jul 28, 2022 at 4:21 PM Oleksandr Porunov <
> > > > > alexandr.poru...@gmail.com> wrote:
> > > > >
> > > > >> Hi Joshua and Pieter,
> > > > >>
> > > > >> Thank you for joining the conversation!
> > > > >>
> > > > >> I didn't actually look into the implementation details yet
> > > > but quickly
> > > > >> checking Traversal.java code I think Pieter is right here.
> > > > >> For some reason I thought we could simply wrap synchronous
> > > > method in
> > > > >> asynchronous, basically something like:
> > > > >>
> > > > >> // the method which should be implemented by a graph
> > > > provider
> > > > >>
> > > > >> Future<E> executeAsync(Callable<E> func);
> > > > >>
> > > > >> public default Future<E> asyncNext(){
> > > > >>     return executeAsync(this::next);
> > > > >> }
> > > > >>
> > > > >> but checking that code I think I was wrong about it.
> > > > Different steps may
> > > > >> execute different logic (i.e. different underlying storage
> > > > queries) for
> > > > >> different graph providers.
> > > > >> Thus, wrapping only terminal steps into async functions
> > > > won't solve the
> > > > >> problem most likely.
> > > > >>
> > > > >> I guess it will require re-writing or extending all steps to
> > > > be able to
> > > > >> pass an async state instead of a sync state.
> > > > >>
> > > > >> I'm not familiar enough with the TinkerPop code yet to claim
> > > > that, so
> > > > >> probably I could be wrong.
> > > > >> I will need to research it a bit more to find out but I
> > > > think that Pieter
> > > > >> is most likely right about a massive re-write.
> > > > >>
> > > > >> Nevertheless, even if that requires massive re-write, I
> > > > would be eager to
> > > > >> start the ball rolling.
> > > > >> I think we either need to try to implement async execution
> > > > in TinkerPop 3
> > > > >> or start making some concrete decisions regarding TinkerPop
> > > > 4.
> > > > >>
> > > > >> I see Marko A. Rodriguez started to work on RxJava back in
> > > > 2019 here
> > > > >>
> > > > https://github.com/apache/tinkerpop/tree/4.0-dev/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava
> > > > >>
> > > > >> but the process didn't go as far as I understand. I guess it
> > > > would be
> > > > >> good to know if we want to completely rewrite TinkerPop in
> > > > version 4 or not.
> > > > >>
> > > > >> If we want to completely rewrite TinkerPop in version 4 then
> > > > I assume it
> > > > >> may take quite some time to do so. In this case I would be
> > > > more likely to
> > > > >> say that it's better to implement async functionality in
> > > > TinkerPop 3 even
> > > > >> if it requires rewriting all steps.
> > > > >>
> > > > >> In case TinkerPop 4 is a redevelopment with breaking changes
> > > > but without
> > > > >> starting to rewrite the whole functionality then I guess we
> > > > could try to
> > > > >> work on TinkerPop 4 by introducing async functionality and
> > > > maybe applying
> > > > >> more breaking changes in places where it's better to re-work
> > > > some parts.
> > > > >>
> > > > >> Best regards,
> > > > >> Oleksandr
> > > > >>
> > > > >>
> > > > >> On Thu, Jul 28, 2022 at 7:47 PM pieter gmail
> > > > <pieter.mar...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >>> Hi,
> > > > >>>
> > > > >>> Does this not imply a massive rewrite of TinkerPop? In
> > > > particular the
> > > > >>> iterator chaining pattern of steps should follow a reactive
> > > > style of
> > > > >>> coding?
> > > > >>>
> > > > >>> Cheers
> > > > >>> Pieter
> > > > >>>
> > > > >>>
> > > > >>> On Thu, 2022-07-28 at 15:18 +0100, Oleksandr Porunov wrote:
> > > > >>> > I'm interested in adding async capabilities to TinkerPop.
> > > > >>> >
> > > > >>> > There were many discussions about async capabilities for
> > > > TinkerPop
> > > > >>> > but
> > > > >>> > there was no clear consensus on how and when it should be
> > > > developed.
> > > > >>> >
> > > > >>> > The benefit for async capabilities is that the user
> > > > calling a query
> > > > >>> > shouldn't need its thread to be blocked to simply wait
> > > > for the result
> > > > >>> > of
> > > > >>> > the query execution. Instead of that a graph provider
> > > > should take
> > > > >>> > care
> > > > >>> > about implementation of async queries execution.
> > > > >>> > If that's the case then many graph providers will be able
> > > > to optimize
> > > > >>> > their
> > > > >>> > execution of async queries by handling less resources for
> > > > the query
> > > > >>> > execution.
> > > > >>> > As a real example of potential benefit we could get I
> > > > would like to
> > > > >>> > point
> > > > >>> > on how JanusGraph executes CQL queries to process Gremlin
> > > > queries.
> > > > >>> > CQL result retrieval:
> > > > >>> >
> > > > >>>
> > > > https://github.com/JanusGraph/janusgraph/blob/15a00b7938052274fe15cf26025168299a311224/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLSimpleSliceFunction.java#L45
> > > > >>> >
> > > > >>> > As seen from the code above, JanusGraph already leverages
> > > > async
> > > > >>> > functionality for CQL queries under the hood but
> > > > JanusGraph is
> > > > >>> > required to
> > > > >>> > process those queries in synced manner, so what
> > > > JanusGraph does - it
> > > > >>> > simply
> > > > >>> > blocks the whole executing thread until result is
> > > > returned instead of
> > > > >>> > using
> > > > >>> > async execution.
> > > > >>> >
> > > > >>> > Of course, that's just a case when we can benefit from
> > > > async
> > > > >>> > execution
> > > > >>> > because the underneath storage backend can process async
> > > > queries. If
> > > > >>> > a
> > > > >>> > storage backend can't process async queries then we won't
> > > > get any
> > > > >>> > benefit
> > > > >>> > from implementing a fake async executor.
> > > > >>> >
> > > > >>> > That said, I believe quite a few graph providers may
> > > > benefit from
> > > > >>> > having a
> > > > >>> > possibility to execute queries in async fashion because
> > > > they can
> > > > >>> > optimize
> > > > >>> > their resource utilization.
> > > > >>> > I believe that we could have a feature flag for storage
> > > > providers
> > > > >>> > which
> > > > >>> > want to implement async execution. Those who can't
> > > > implement it or
> > > > >>> > don't
> > > > >>> > want to implement it may simply disable async
> > > > capabilities which will
> > > > >>> > result in throwing an exception anytime an async function
> > > > is called.
> > > > >>> > I
> > > > >>> > think it should be fine because we already have some
> > > > feature flags
> > > > >>> > like
> > > > >>> > that for graph providers. For example "Null Semantics"
> > > > was added in
> > > > >>> > TinkerPop 3.5.0 but `null` is not supported for all graph
> > > > providers.
> > > > >>> > Thus,
> > > > >>> > a feature flag for Null Semantics exists like
> > > > >>> >
> > > > "g.getGraph().features().vertex().supportsNullPropertyValues()"
> > > > .
> > > > >>> > I believe we can enable async in TinkerPop 3 by providing
> > > > async as a
> > > > >>> > feature flag and letting graph providers implement it at
> > > > their will.
> > > > >>> > Moreover if a graph provider wants to have async
> > > > capabilities but
> > > > >>> > their
> > > > >>> > storage backends don't support async capabilities then it
> > > > should be
> > > > >>> > easy to
> > > > >>> > hide async execution under an ExecutorService which
> > > > mimics async
> > > > >>> > execution.
> > > > >>> > I believe we could do that for TinkerGraph so that users
> > > > could
> > > > >>> > experiment
> > > > >>> > with async API at least. I believe we could simply have a
> > > > default
> > > > >>> > "async"
> > > > >>> > function implementation for TinkerGraph which wraps all
> > > > sync
> > > > >>> > executions in
> > > > >>> > a function and sends it to that ExecutorService (we can
> > > > discuss which
> > > > >>> > one).
> > > > >>> > In such a case TinkerGraph will support async execution
> > > > even without
> > > > >>> > real
> > > > >>> > async functionality. We could also potentially provide
> > > > some
> > > > >>> > configuration
> > > > >>> > options to TinkerGraph to configure thread pool size,
> > > > executor
> > > > >>> > service
> > > > >>> > implementation, etc.
> > > > >>> >
> > > > >>> > I didn't think about how it is better to implement those
> > > > async
> > > > >>> > capabilities
> > > > >>> > for TinkerPop yet but I think reusing a similar approach
> > > > like in
> > > > >>> > Node.js
> > > > >>> > which returns Promise when calling Terminal steps could
> > > > be good. For
> > > > >>> > example, we could have a method called `async` which
> > > > accepts a
> > > > >>> > termination
> > > > >>> > step and returns a necessary Future object.
> > > > >>> > I.e.:
> > > > >>> > g.V(123).async(Traversal.next())
> > > > >>> > g.V().async(Traversal.toList())
> > > > >>> > g.E().async(Traversal.toSet())
> > > > >>> > g.E().async(Traversal.iterate())
> > > > >>> >
> > > > >>> > I know that there were discussions about adding async
> > > > functionality
> > > > >>> > to
> > > > >>> > TinkerPop 4 eventually, but I don't see strong reasons
> > > > why we
> > > > >>> > couldn't add
> > > > >>> > async functionality to TinkerPop 3 with a feature flag.
> > > > >>> > It would be really great to hear some thoughts and
> > > > concerns about it.
> > > > >>> >
> > > > >>> > If there are no concerns, I'd like to develop a proposal
> > > > for further
> > > > >>> > discussion.
> > > > >>> >
> > > > >>> > Best regards,
> > > > >>> > Oleksandr Porunov
> > > > >>>
> > > > >>>

Reply via email to