Re: VertexUpdateFunction
Hi Ali, If you're talking about a Java Map and the constructor of your compute function, then your implementation is not really scalable. Cheers, Martin On 31.05.2017 14:19, rost...@informatik.uni-leipzig.de wrote: Hi Martin, thanks for your answer.. For the vertex degree, I passed a map (vertex_id -> degree) to the constructor. Regards, Ali Zitat von Martin Junghanns : Hi Ali :) You could compute the degrees beforehand (e.g. using the Graph.[in|out|get]degrees()) methods and use the resulting dataset as a new vertex dataset. You can now run your vertex-centric computation and access the degrees as vertex value. Cheers, Martin On 29.05.2017 09:28, rost...@informatik.uni-leipzig.de wrote: Hi, I want to write an iterative algorithm using Gelly (spargel), like: https://ci.apache.org/projects/flink/flink-docs-release-0.8/spargel_guide.html My question is how I can access the actual vertex information like the vertex degree (in- or outdegree) under the subclass of VertexUpdateFunction. A way is maybe to send an instance of graph to the constructor. However, getting the degree for a vertex in each instance of this subclass would be really inefficient. Thanks, Ali
Re: VertexUpdateFunction
Hi Ali :) You could compute the degrees beforehand (e.g. using the Graph.[in|out|get]degrees()) methods and use the resulting dataset as a new vertex dataset. You can now run your vertex-centric computation and access the degrees as vertex value. Cheers, Martin On 29.05.2017 09:28, rost...@informatik.uni-leipzig.de wrote: Hi, I want to write an iterative algorithm using Gelly (spargel), like: https://ci.apache.org/projects/flink/flink-docs-release-0.8/spargel_guide.html My question is how I can access the actual vertex information like the vertex degree (in- or outdegree) under the subclass of VertexUpdateFunction. A way is maybe to send an instance of graph to the constructor. However, getting the degree for a vertex in each instance of this subclass would be really inefficient. Thanks, Ali
Re: Support/connector for Neo4j?
Please let me know, if you need help with the connector or if you want to extend it. Cheers, Martin On 24.03.2017 16:07, alex.decastro wrote: Thanks Tim! I missed that one on Jira. :-) -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Support-connector-for-Neo4j-tp12397p12399.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Context-specific step function in Iteration
Hi again, I had a bug in my logic. It works as expected (which is perfect). So maybe for others: Problem: - execute superstep-dependent UDFs on datasets which do not have access to the iteration context Solution: - add dummy element to the working set (W) at the beginning of the step function - extract dummy from W using a filter function - convert dummy into DataSet (superstep) using a map function - broadcast that 1-element dataset to the UDFs applied on the "external" datasets - filter non-dummy elements (if necessary) and continue step function Note, that it should also work with cross instead of broadcasting, I did not test which way works faster, yet. Apologies if anyone thought about this when it was my error in the end :) Cheers, Martin On 29.05.2016 14:05, Martin Junghanns wrote: Hi everyone, In a step-function (bulk) I'd like to join the working set W with another data set T. The join field of T depends on the current super step. Unfortunately, W has no access to the iteration runtime context. I tried to extract the current superstep at the beginning of the step function and broadcasted it to a UDF applied on T (which sets the correct value join field) and perform the join always on the same fields. Unfortunately, this does not seem to work either. I could work around that by replicating the elements of T and join multiple times but this does not scale very well. Any suggestion would be appreciated. Cheers and thank you, Martin
Context-specific step function in Iteration
Hi everyone, In a step-function (bulk) I'd like to join the working set W with another data set T. The join field of T depends on the current super step. Unfortunately, W has no access to the iteration runtime context. I tried to extract the current superstep at the beginning of the step function and broadcasted it to a UDF applied on T (which sets the correct value join field) and perform the join always on the same fields. Unfortunately, this does not seem to work either. I could work around that by replicating the elements of T and join multiple times but this does not scale very well. Any suggestion would be appreciated. Cheers and thank you, Martin
Re: Memory ran out PageRank
Hi, I understand the confusion. So far, I did not run into the problem, but I think this needs to be adressed as all our graph processing abstractions are implemented on top of the delta iteration. According to the previous mailing list discussion, the problem is with the solution set and its missing ability to spill. If this is the still the case, we should open an issue for that. Any further opinions on that? Cheers, Martin On 14.03.2016 17:55, Ovidiu-Cristian MARCU wrote: Thank you for this alternative. I don’t understand how the workaround will fix this on systems with limited memory and maybe larger graph. Running Connected Components on the same graph gives the same problem. IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 31 maxPartition: 32 number of overflow segments: 417 bucketSize: 827 Overall memory: 149159936 Partition memory: 65601536 Message: Index: 32, Size: 31 at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Best, Ovidiu On 14 Mar 2016, at 17:36, Martin Junghanns wrote: Hi I think this is the same issue we had before on the list [1]. Stephan recommended the following workaround: A possible workaround is to use the option "setSolutionSetUnmanaged(true)" on the iteration. That will eliminate the fragmentation issue, at least. Unfortunately, you cannot set this when using graph.run(new PageRank(...)) I created a Gist which shows you how to set this using PageRank https://gist.github.com/s1ck/801a8ef97ce374b358df Please let us know if it worked out for you. Cheers, Martin [1] http://mail-archives.apache.org/mod_mbox/flink-user/201508.mbox/%3CCAELUF_ByPAB%2BPXWLemPzRH%3D-awATeSz4sGz4v9TmnvFku3%3Dx3A%40mail.gmail.com%3E On 14.03.2016 16:55, Ovidiu-Cristian MARCU wrote: Hi, While running PageRank on a synthetic graph I run into this problem: Any advice on how should I proceed to overcome this memory issue? IterationHead(Vertex-centric iteration (org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 | org.apache.flink.graph.library.PageRank$RankMesseng$ java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 24 maxPartition: 25 number of overflow segments: 328 bucketSize: 638 Overall memory: 115539968 Partition memory: 50659328 Message: Index: 25, Size: 24 at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Thanks! Best, Ovidiu
Re: Memory ran out PageRank
Hi I think this is the same issue we had before on the list [1]. Stephan recommended the following workaround: A possible workaround is to use the option "setSolutionSetUnmanaged(true)" on the iteration. That will eliminate the fragmentation issue, at least. Unfortunately, you cannot set this when using graph.run(new PageRank(...)) I created a Gist which shows you how to set this using PageRank https://gist.github.com/s1ck/801a8ef97ce374b358df Please let us know if it worked out for you. Cheers, Martin [1] http://mail-archives.apache.org/mod_mbox/flink-user/201508.mbox/%3CCAELUF_ByPAB%2BPXWLemPzRH%3D-awATeSz4sGz4v9TmnvFku3%3Dx3A%40mail.gmail.com%3E On 14.03.2016 16:55, Ovidiu-Cristian MARCU wrote: Hi, While running PageRank on a synthetic graph I run into this problem: Any advice on how should I proceed to overcome this memory issue? IterationHead(Vertex-centric iteration (org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 | org.apache.flink.graph.library.PageRank$RankMesseng$ java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 24 maxPartition: 25 number of overflow segments: 328 bucketSize: 638 Overall memory: 115539968 Partition memory: 50659328 Message: Index: 25, Size: 24 at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Thanks! Best, Ovidiu
Re: LDBC Graph Data into Flink
Hi, I wrote a short blog post about the ldbc-flink tool including a short overview of Flink and a Gelly example. http://ldbcouncil.org/blog/ldbc-and-apache-flink Best, Martin On 06.10.2015 11:00, Martin Junghanns wrote: > Hi Vasia, > > No problem. Sure, Gelly is just a map() call away :) > > Best, > Martin > > On 06.10.2015 10:53, Vasiliki Kalavri wrote: >> Hi Martin, >> >> thanks a lot for sharing! This is a very useful tool. >> I only had a quick look, but if we merge label and payload inside a Tuple2, >> then it should also be Gelly-compatible :) >> >> Cheers, >> Vasia. >> >> On 6 October 2015 at 10:03, Martin Junghanns >> wrote: >> >>> Hi all, >>> >>> For our benchmarks with Flink, we are using a data generator provided by >>> the LDBC project (Linked Data Benchmark Council) [1][2]. The generator uses >>> MapReduce to create directed, labeled, attributed graphs that mimic >>> properties of real online social networks (e.g, degree distribution, >>> diameter). The output is stored in several files either local or in HDFS. >>> Each file represents a vertex, edge or multi-valued property class. >>> >>> I wrote a little tool, that parses and transforms the LDBC output into two >>> datasets representing vertices and edges. Each vertex has a unique id, a >>> label and payload according to the LDBC schema. Each edge has a unique id, >>> a label, source and target vertex IDs and also payload according to the >>> schema. >>> >>> I thought this may be useful for others so I put it on GitHub [2]. It >>> currently uses Flink 0.10-SNAPSHOT as it depends on some fixes made in >>> there. >>> >>> Best, >>> Martin >>> >>> [1] http://ldbcouncil.org/ >>> [2] https://github.com/ldbc/ldbc_snb_datagen >>> [3] https://github.com/s1ck/ldbc-flink-import >>> >>
Re: Long ids from String
Hi, Sounds like RDF problems to me :) To build an index you could do the following: triplet := (0) build set of all triplets (with strings) triplets = triplets1.union(triplets2) (1) assign unique long ids to each vertex vertices = triplets.flatMap() => [,,...].distinct() vertexWithID = vertices.zipWithUniqueID() => [<1,a>,<2,c>] (2) for each triplet dataset // update source and target identifier in triplet dataset triplets1.join(vertexWithID) .where(0) // .equalTo(1) // <1,a> .with(/* replace source string with unique id */) .join(vertexWithId) .where(2) // <1,b,c> .equalTo(1) // <2,c> .with(/* replace target string with unique id */) => <1,b,2> (3) store updated triplet sets for later processing Of course this is a lot of computational effort, but it needs to be done once and you have an index of your graphs which you can use for further processing. If your job contains only one join between the triplet datasets, this is clearly not an option. Just an idea :) Best, Martin On 03.11.2015 10:10, Flavio Pompermaier wrote: > Hi Martin, > thanks for the suggestion but unfortunately in my use case I have another > problem: I have to join triplets when f2==f0..is there any way to translate > also references? i.e. if I have 2 tuples , when I apply that > function I obtain something like <1,>,<2,>. > I'd like to be able to join the 1st tuple with the 2nd (so I should know > that c=>2). > Which strategy do you think it could be the best option to achieve that? > At the moment I was thinking to persist the ids and use a temporary table > with an autoincrement long id but maybe there's a simpler solution.. > > Best, > Flavio > > On Tue, Nov 3, 2015 at 9:59 AM, Martin Junghanns > wrote: > >> Hi Flavio, >> >> If you just want to assign a unique Long identifier to each element in >> your dataset, you can use the DataSetUtils.zipWithUniqueId() method [1]. >> >> Best, >> Martin >> >> [1] >> >> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java#L131 >> >> On 03.11.2015 09:42, Flavio Pompermaier wrote: >>> Hi to all, >>> >>> I was reading the thread about the Neo4j connector and an old question >> came >>> to my mind. >>> >>> In my Flink job I have Tuples with String ids that I use to join on that >>> I'd like to convert to long (because Flink should improve quite a lot the >>> memory usage and the processing time if I'm not wrong). >>> Is there any recommended way to do that conversion in Flink? >>> >>> Best, >>> Flavio >>> >> > > >
Re: Long ids from String
Hi Flavio, If you just want to assign a unique Long identifier to each element in your dataset, you can use the DataSetUtils.zipWithUniqueId() method [1]. Best, Martin [1] https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java#L131 On 03.11.2015 09:42, Flavio Pompermaier wrote: > Hi to all, > > I was reading the thread about the Neo4j connector and an old question came > to my mind. > > In my Flink job I have Tuples with String ids that I use to join on that > I'd like to convert to long (because Flink should improve quite a lot the > memory usage and the processing time if I'm not wrong). > Is there any recommended way to do that conversion in Flink? > > Best, > Flavio >
Re: Fwd: Problem applying a groupReduce function to a grouped data set
Hi, just an idea: In the source code documentation, it states that projectFirst and projectSecond lose type information, which could explain why your group reduce expects . I found an example [1] that calls .types() to define the returned types, but this method is deprecated. What I would try is to replace the projectFirst and projectSecond with a JoinFunction and output Tuple2 "manually" like so: actors.map(new JoinNames()) .join(weightedRatings) .where(1).equalTo(0) .with(new JoinFunction< TypeLeft, // output type of JoinNames() Tuple2, Tuple2> { @Override public Tuple2 join( TypeLeft left, Tuple2 right) throws Exception { return new Tuple2<>(left.f0, right.f1); } }) .withForwardedFieldsFirst("f0") .withForwardedFieldsSecond("f1") .groupBy(0) .reduceGroup(new MeanRatingCalculator()) .first(10) .print(); Hope this helps. Best, Martin [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/examples.html#relational-query On 01.11.2015 13:52, Lea Helmers wrote: Hi! When I try to apply a groupReduce function to a data set I get an error. The data set is created like this: DataSet> actorsTemp = env.readCsvFile("/home/lea/Documents/impro3_ws15/actors.tsv") .fieldDelimiter("\t") .includeFields("1110") .types(String.class, String.class, String.class); DataSet> actresses = env.readCsvFile("/home/lea/Documents/impro3_ws15/actresses.tsv") .fieldDelimiter("\t") .includeFields("1110") .types(String.class, String.class, String.class); DataSet> ratings = env.readCsvFile("/home/lea/Documents/impro3_ws15/ratings.tsv") .fieldDelimiter("\t") .includeFields("0111") .types(Float.class, Float.class, String.class) .filter(new NumberVotesFilter()); //merge actors and actresses DataSet> actors = actorsTemp.union(actresses); //create weighted rating DataSet> weightedRatings = ratings.map(new WeightedRatingCalculator()); THIS IS WHAT I'M TRYING IN THE MAIN METHOD: actors.map(new JoinNames()) .join(weightedRatings) .where(1).equalTo(0) .projectFirst(0).projectSecond(1) .groupBy(0) .reduceGroup(new MeanRatingCalculator()) .first(10).print(); And here is the GroupReduce function I wrote: public static class MeanRatingCalculator implements GroupReduceFunction, Tuple3> { public void reduce(Iterable> ratedActors, Collector> out) throws Exception { String name = null; Float ratings = 0F; int numberOfMovies = 0; for (Tuple2 a : ratedActors) { //store the name name = a.f0; //update the sum of the ratings and number of movies ratings += a.f1; numberOfMovies++; } // emit name, average rating and number of films out.collect(new Tuple3(name, ratings/(float)numberOfMovies, numberOfMovies)); } } I get the following error message when I try to compile the code: java: method reduceGroup in class org.apache.flink.api.java.operators.UnsortedGrouping cannot be applied to given types; required: org.apache.flink.api.common.functions.GroupReduceFunction found: de.tub.dima.TopActors.MeanRatingCalculator reason: no instance(s) of type variable(s) R exist so that argument type de.tub.dima.TopActors.MeanRatingCalculator conforms to formal parameter type org.apache.flink.api.common.functions.GroupReduceFunction I can't figure out what the problem might be and would be very grateful for any help!! I hope I have given all the necessary information. I'm using Ubuntu 14.04 and IntelliJ Idea as IDE. Thank you very much, Lea
Re: LDBC Graph Data into Flink
Hi Vasia, No problem. Sure, Gelly is just a map() call away :) Best, Martin On 06.10.2015 10:53, Vasiliki Kalavri wrote: > Hi Martin, > > thanks a lot for sharing! This is a very useful tool. > I only had a quick look, but if we merge label and payload inside a Tuple2, > then it should also be Gelly-compatible :) > > Cheers, > Vasia. > > On 6 October 2015 at 10:03, Martin Junghanns > wrote: > >> Hi all, >> >> For our benchmarks with Flink, we are using a data generator provided by >> the LDBC project (Linked Data Benchmark Council) [1][2]. The generator uses >> MapReduce to create directed, labeled, attributed graphs that mimic >> properties of real online social networks (e.g, degree distribution, >> diameter). The output is stored in several files either local or in HDFS. >> Each file represents a vertex, edge or multi-valued property class. >> >> I wrote a little tool, that parses and transforms the LDBC output into two >> datasets representing vertices and edges. Each vertex has a unique id, a >> label and payload according to the LDBC schema. Each edge has a unique id, >> a label, source and target vertex IDs and also payload according to the >> schema. >> >> I thought this may be useful for others so I put it on GitHub [2]. It >> currently uses Flink 0.10-SNAPSHOT as it depends on some fixes made in >> there. >> >> Best, >> Martin >> >> [1] http://ldbcouncil.org/ >> [2] https://github.com/ldbc/ldbc_snb_datagen >> [3] https://github.com/s1ck/ldbc-flink-import >> >
LDBC Graph Data into Flink
Hi all, For our benchmarks with Flink, we are using a data generator provided by the LDBC project (Linked Data Benchmark Council) [1][2]. The generator uses MapReduce to create directed, labeled, attributed graphs that mimic properties of real online social networks (e.g, degree distribution, diameter). The output is stored in several files either local or in HDFS. Each file represents a vertex, edge or multi-valued property class. I wrote a little tool, that parses and transforms the LDBC output into two datasets representing vertices and edges. Each vertex has a unique id, a label and payload according to the LDBC schema. Each edge has a unique id, a label, source and target vertex IDs and also payload according to the schema. I thought this may be useful for others so I put it on GitHub [2]. It currently uses Flink 0.10-SNAPSHOT as it depends on some fixes made in there. Best, Martin [1] http://ldbcouncil.org/ [2] https://github.com/ldbc/ldbc_snb_datagen [3] https://github.com/s1ck/ldbc-flink-import
Re: Inheritance and FlatMap with custom POJO
Hi Giacomo, I ran into the same issue. Seems to be coupled to the serialization mechanism of UDFs. I solved it by letting the base class implement the UDF interface (e.g. FlatMapFunction) and in addition make it generic (which should be necessary in your example). public [abstract] class CullTimeBase implements FlatMapFunction { // ... } public class CullTimeRainFall extends CullTimeBaseRainFallPOJO> { // ... } This should work. Best, Martin On 16.09.2015 10:41, Giacomo Licari wrote: Hi guys, I'm trying to create a base class which is inherited from classes implementing FlatMap method on specific POJO types. It seems inheritance doesn't work, I can access this.PropertyName or super.PropertyName from flatMap method but values are always null. Here the derived class, using RainfallPOJO: public class CullTimeRainfall extends CullTimeBase implements FlatMapFunction { public CullTimeRainfall(int num, int den, String time_data_name, String start_time, String end_time, int interval, String time_unit){ super(num, den, time_data_name, start_time, end_time, interval, time_unit); } public void flatMap(RainfallPOJO obj, Collector coll) throws Exception { DateFormat formatter = new SimpleDateFormat("-MM-dd'T'hh:mm:ss.SSS"); try { Date time = formatter.parse( obj.getTime().replaceAll( "([0-9\\-T]+:[0-9]{2}:[0-9.+]+):([0-9]{2})", "$1$2" ) ); if(time.after(this.startTime) && time.before(this.endTime)){ coll.collect(obj); } } catch(Exception e){ e.printStackTrace(); } } } My Base class is: public class CullTimeBase { protected int numerator; protected int denominator; protected String timeDataName; protected Date startTime; protected Date endTime; protected int interval; protected String timeUnit; public CullTimeBase(int num, int den, String time_data_name, String start_time, String end_time, int interv, String time_unit){ numerator = num; denominator = den; timeDataName = time_data_name; interval = interv; timeUnit = time_unit; DateFormat formatter = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS"); try { startTime = formatter.parse(start_time); endTime = formatter.parse(end_time); } catch (ParseException e) { e.printStackTrace(); } } It works only if I declare all variables and methods in only one class, but so I should repeat the same properties in more classes. I would only specialize each derived class with a custom flatMap method. which uses a custom POJO type. Thanks a lot, Giacomo
Re: Containment Join Support
Hi Fabian, hi Stephen, thanks for answering my question. Good hint with the list replication, I will benchmark this vs. cross + filter. Best, Martin Am 17.07.2015 um 11:17 schrieb Stephan Ewen: I would rewrite this to replicate the list into tuples: "foreach x in list: emit (x, list)" Then join on fields 0. This replicates the lists, but makes the join very efficient. On Fri, Jul 17, 2015 at 12:26 AM, Fabian Hueske <mailto:fhue...@gmail.com>> wrote: Hi Martin, good to hear that you like Flink :-) AFAIK, there are no plans to add a containment join. The Flink community is currently working on adding support for outer joins. Regarding a containment join, I am not sure about the number of use cases. I would rather try to implement it on top of Flink's batch API instead of adding it as an internal feature/operator to the system because this would touch a lot of things (API, optimizer, operator implementation). There might be better ways to implement a containment join than using a cross and a filter. - Do you know a distributed algorithm for containment joins? Maybe it can be implemented with Flink's API. - I guess, you are implementing a generic graph framework, but can you make certain assumptions about the data such as relative sizes of the inputs or avg/max size of the lists, etc.? Contributions to Gelly (and Flink in general) are highly welcome. Best, Fabian 2015-07-16 9:39 GMT+02:00 Martin Junghanns mailto:martin.jungha...@gmx.net>>: Hi everyone, at first, thanks for building this great framework! We are using Flink and especially Gelly for building a graph analytics stack (gradoop.com <http://gradoop.com>). I was wondering if there is a [planned] support for a containment join operator. Consider the following example: DataSet> left := {[0, 1], [2, 3, 4], [5]} DataSet> right := {<0, 1>, <1, 0>, <2, 1>, <5, 2>} What I want to compute is left.join(right).where(list).contains(tuple.f0) := { <[0, 1], <0,1>>, <[0, 1], <1, 0>>, <[2, 3, 4], <2, 1>>, <[5], <5, 2> } At the moment, I am solving that using cross and filter, which can be expensive. The generalization of that operator would be "set containment join", where you join if the right set is contained in the left set. If there is a general need for that operator, I would also like to contribute to its implementation. But maybe, there is already another nice solution which I didn't discover yet? Any help would be appreciated. Especially since I would also like to contribute some of our graph operators (e.g., graph summarization) back to Flink/Gelly (current WIP state can be found here: [1]). Thanks, Martin [1] https://github.com/dbs-leipzig/gradoop/blob/%2345_gradoop_flink/gradoop-flink/src/main/java/org/gradoop/model/impl/operators/Summarization.java
Containment Join Support
Hi everyone, at first, thanks for building this great framework! We are using Flink and especially Gelly for building a graph analytics stack (gradoop.com). I was wondering if there is a [planned] support for a containment join operator. Consider the following example: DataSet> left := {[0, 1], [2, 3, 4], [5]} DataSet> right := {<0, 1>, <1, 0>, <2, 1>, <5, 2>} What I want to compute is left.join(right).where(list).contains(tuple.f0) := { <[0, 1], <0,1>>, <[0, 1], <1, 0>>, <[2, 3, 4], <2, 1>>, <[5], <5, 2> } At the moment, I am solving that using cross and filter, which can be expensive. The generalization of that operator would be "set containment join", where you join if the right set is contained in the left set. If there is a general need for that operator, I would also like to contribute to its implementation. But maybe, there is already another nice solution which I didn't discover yet? Any help would be appreciated. Especially since I would also like to contribute some of our graph operators (e.g., graph summarization) back to Flink/Gelly (current WIP state can be found here: [1]). Thanks, Martin [1] https://github.com/dbs-leipzig/gradoop/blob/%2345_gradoop_flink/gradoop-flink/src/main/java/org/gradoop/model/impl/operators/Summarization.java