[ 
https://issues.apache.org/jira/browse/TINKERPOP-1564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687865#comment-15687865
 ] 

Marko A. Rodriguez commented on TINKERPOP-1564:
-----------------------------------------------

In order to support many {{RemoteTraversals}} on the same machine split across 
threads, {{IntraPartition}} can be used. This is useful for situations such as 
threading a traversal in a single-machine graph system (e.g. TinkerGraph) or 
within a multi-machine graph system, to have each processor core being used to 
process data in a single partition. That is, "thread the workers."

{code}
public class IntraPartition implements Partition {
        
        private final Partition parentPartition;
        private final Predicate<Element> inPartitionPredicate;

        public IntraPartition(final Partition parentPartition, final 
Predicate<Element> inPartitionPredicate) {
                this.parentPartition = parentPartition;
                this.inPartitionPredicate = inPartitionPredicate;
        }

        public boolean contains(final Element element) {
                return this.inPartitionPredicate.test(element);
        }

        public Iterator<Vertex> vertices(final Object... ids) {
                return IteratorUtils.filter(this.parentPartition.vertices(ids), 
this::contains);
        }

        public Iterator<Edge> edges(final Object... ids) {
                return IteratorUtils.filter(this.parentPartition.edges(ids), 
this::contains);
        }

        ////////////////////

        public static class HashPartitionPredicate implements 
Predicate<Element> {
                private int partitionNumber;
                private int totalPartitions;

                public HashPartitionPredicate(final int partitionNumber, final 
int totalPartitions) {
                        this.partitionNumber = partitionNumber;
                        this.totalPartitions = totalPartitions;
                }       

                public boolean test(final Element element) {
                        return this.partitionNumber == element.hashCode() % 
this.totalPartitions;
                }
        }
}
{code}

{{TinkerGraph}} has a single {{Partition}} as its a single machine graph 
system. However, {{IntraPartition}} can be used to create numerous 
"intra-partitions" of the single global partition. Thus, when a traverser is no 
longer in the current intra-partition (thread), it is routed to another thread 
for processing. {{HashPartitionPredicate}} ensures (within reason) a load 
balance of traversers across all the {{IntraPartition}} threads.

{{DSEGraph}} has multiple {{Partitions}} as its a multi-machine graph system. 
However, {{IntraPartition}} can be used to create numerous "intra-partitions" 
of each {{Partition}}. Thus, the processing of the traversers within a "data 
local partition" (within a machine) are load balanced across the threads of the 
{{IntraPartitions}}.

> Distributed OLTP Traversals via RemoteStep
> ------------------------------------------
>
>                 Key: TINKERPOP-1564
>                 URL: https://issues.apache.org/jira/browse/TINKERPOP-1564
>             Project: TinkerPop
>          Issue Type: Improvement
>          Components: driver, process, server
>    Affects Versions: 3.2.3
>            Reporter: Marko A. Rodriguez
>
> This proposal unifies OLTP and OLAP into a single framework that removes the 
> need for OLAP {{GraphComputer}} by introducing distributed, data local 
> processing to OLTP. In essence, this is a proposal for a step-by-step query 
> routing framework within {{Traversal}}. This proposal can work across 
> machines in a cluster, threads on a machine, or in a hierarchical fashion 
> machines&threads. The example presented will discuss distribution across 
> machines in a cluster as its the most complicated scenario.
> Currently, an OLTP traversal executes at a particular machine (or thread) and 
> pulls vertex/edge/etc. data to it accordingly in order to solve the 
> traversal. In OLAP, the traversal is cloned and distributed to all machines 
> in the cluster and traversals communicate with one another by sending 
> {{Traversers}} (i.e. messages) between themselves ensuring data local 
> processing. Given recent advancements in GremlinServer and 
> {{RemoteTraversal}}, it is possible to add traverser routing to OLTP and 
> thus, effect the computational paradigm of Gremlin OLAP in Gremlin OLTP with 
> some added benefits not possible in Gremlin OLAP.
> Assume a 4 machine cluster and the following traversal:
> {code}
> g.V(1).out(‘knows’).has(‘age’,gt(20)).out(‘likes’).values(‘name’)
> {code}
> Every time there is a "walk" (adjacency), it is possible that the 
> {{Traverser}} is no longer accessing data local to the current machine. In 
> order to do data local query routing, every adjacency would feed into a 
> {{RemoteStep}}. The traversal above would be cloned (via {{Bytecode}} 
> distribution) across the cluster where "sibling" {{RemoteSteps}} would have 
> network access to one another using the same protocol of 
> {{RemoteConnection}}. Thus, given the 4 node cluster example, the above 
> traversal would be overlaid as below. Note that {{remote()}} would not be a 
> new step in the language, but simply provided here to show where 
> {{RemoteStrategy}} would insert {{RemoteSteps}} into the traversal.
> {code}
> g.V(1).out(‘knows’).remote().has(‘age’,gt(20)).out(‘likes’).remote().values(‘name’).remote()
>                        |                                        |             
>           ^
>     
> __.out(‘knows’).remote().has(‘age’,gt(20)).out(‘likes’).remote().values(‘name’).remote()
>                        |                                        |             
>           |
>     
> __.out(‘knows’).remote().has(‘age’,gt(20)).out(‘likes’).remote().values(‘name’).remote()
>                        |                                        |             
>           |
>     
> __.out(‘knows’).remote().has(‘age’,gt(20)).out(‘likes’).remote().values(‘name’).remote()
> {code}
> The top traversal is called the "master traversal" and the other three 
> "worker traversals." Note that this is identical to current Gremlin OLAP. 
> Now, the master traversal would be the traversal that is {{.next()}}'d for 
> results. So, when the "master traversal" is {{next()}}'d, {{g.V(1)}} will 
> fetch {{v[1]}} and then its outgoing knows-adjacencies. These adjacent 
> "reference vertices" would be fed into the first {{remote()}} and a "routing 
> algorithm" would determine where in the cluster the particular vertex's data 
> is. Thus, {{remote()}} ({{RemoteStep}}) serves as a router, pushing 
> {{Traversers}} local to the data. Finally, note that the final 
> {{RemoteSteps}} can only feed back to the "master traversal" for ultimate 
> aggregation and return to the user. 
> TinkerPop currently has all the structures in place to make this possible:
>       1. Encapsulation of computational metadata via {{Traverser}}.
>       2. The ability to detach {{Traversers}} and migrate/serialize them via 
> {{Traverser.detach()}} and {{Traverser.attach()}}.
>       3. The concept of {{ReferenceElement}} so the traverser only carries 
> with it enough information to re-attach at the remote site.
>       4. {{Bytecode}} and the ability to send {{Traversals}} across the 
> cluster.
>       5. GremlinServer and {{RemoteConnection}} with the concept of 
> {{RemoteTraversal}} and proposed {{RemoteStep}} extension.
> What does {{RemoteStep}} look like?
> {code}
> public class RemoteStep<S> extends AbstractStep<S,S> {
>       
>       private Cluster cluster;
>       public RemoteStep(final Traversal.Admin traversal, final Cluster 
> cluster) {
>         super(traversal);
>         this.cluster = cluster;
>       }
>       public Traverser.Admin<S> processNextStart() {
>               while(cluster.isOpen(this)) {
>                 if(this.starts.hasNext()) {
>                   final Traverser.Admin<S> traverser = this.starts.next();
>                   if(cluster.isLocal(traverser))
>                         return traverser;
>                       else
>                         cluster.route(traverser.detach());
>                 }
>               } 
>               throw FastNoSuchElementException.instance();
>       }
> }
> {code}
> {{Cluster}} is a concept in GremlinServer that [~spmallette] developed and is 
> more versed in. I will simply discuss what I expect from {{Cluster}}, unaware 
> of what is already possible or the methods to do it.
> * {{Cluster.isLocal(Traverser)}} takes a traverser and returns true if 
> {{Traverser.get()}} references local data. This will probably require some 
> sort of {{hash(traverser.get())}} to determine if the vertex/edge/etc. 
> referenced is local.
> * {{Cluster.route(Traverser)}} takes a traverser an will send it to the 
> remote machine/thread responsible for handling the data referenced by the 
> traverser. Note that a traverser maintains its current {{stepId}} 
> ({{Traverser.getStepId()}}) and thus, is able to put itself into the correct 
> spot in the traversal stream at the remote local.
> * {{Cluster.isOpen(RemoteStep)}} determines whether any of the other sibling 
> {{RemoteSteps}} have data still to process. If not, then there are no more 
> traversers flowing through that section of the traversal and the step can be 
> "closed." (**this is the sketchy part of the proposal as I'm not quite sure 
> the best way to do this**)
> * {{Cluster.vertices()}} will need to be a concept so that traversals can 
> scan only those vertices in their partition. For example, assume {{V()}}.  
> {{Cluster.vertices()}} would make sure that only those vertices in the 
> current partition are fetched.
> Hopefully it is clear that adding distributed OLTP functionality to Gremlin 
> is relatively simple. Gremlin already has all the requisite structure save 
> {{RemoteStep}} and necessary {{Cluster}} methods.
> Here are the benefits of this model:
> * Gremlin OLTP is Gremlin OLAP. The semantics of Gremlin OLAP are exactly 
> what is proposed here but with the added benefit that message passing happens 
> at the partition/subgraph level, not the star vertex level.
> * There is no need for {{SparkGraphComputer}} as GremlinServer now plays the 
> role of SparkServer. The added benefit, no pulling data from the graph 
> database and re-representing it in an RDD or {{SequenceFile}}.
> * No longer are "local children traversals" the boundary for "OLAP." Local 
> children can be processed beyond the star graph, but would require pulling 
> data from a remote machine is necessary. However, given a good graph 
> partitioning algorithm, local children will most likely NOT leave the 
> subgraph partition and thus, will remain a local computation.
> * Failover is already built into the architecture. If a {{RemoteStep}} can 
> not be accessed, but the machine's data is still available (perhaps via 
> replication), then data will simply be pulled over the wire instead of 
> traversers routed to the "dead node." 
> * The infrastructure for side-effects and reducing barrier steps already 
> implemented for Gremlin OLAP would automatically work for distributed Gremlin 
> OLTP.
> * If the entire graph is hot in-memory across the cluster, then distributed 
> in-memory graph computing is possible. Again, no more linear-scans over 
> partitions like with Giraph/Spark/etc. ({{GraphComputer}}).
> * If transactions are worked out, then distributed OLTP Gremlin provides 
> mutation capabilities (something currently not implemented for 
> {{GraphComputer}}). That is {{addV}}, {{addE}}, {{drop}}, etc. just works. 
> **Caveate, transactions in this environment across GremlinServer seems 
> difficult.**
> So thats that. This could very well be the future of Gremlin OLAP. The 
> disjoint between OLAP and OLTP would go away, the codebase would be 
> simplified, and the computational gains in terms of performance and 
> expressivity would be great. This is a big deal idea.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to