[jira] [Commented] (FLINK-2548) In a VertexCentricIteration, the run time of one iteration should be proportional to the size of the workset
[ https://issues.apache.org/jira/browse/FLINK-2548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14707946#comment-14707946 ] Vasia Kalavri commented on FLINK-2548: -- [~ggevay], an approach similar to what you are describing is implemented by the GatherSumApplyIteration, i.e. join + reduce + join. I have also found out in my experiments that this implementation is faster than the vertex-centric in many cases, but also might be slower, depending on the dataset and algorithm. That's why we support both models and let the user decide which one to use based on their needs. I don't think changing the vertex-centric implementation to switch between coGroup and join is a good idea. I think that the two supported iteration models in Gelly cover the majority of common use-cases. For more custom solutions one can always write their own delta iteration. > In a VertexCentricIteration, the run time of one iteration should be > proportional to the size of the workset > > > Key: FLINK-2548 > URL: https://issues.apache.org/jira/browse/FLINK-2548 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 0.9, 0.10 >Reporter: Gabor Gevay >Assignee: Gabor Gevay > > Currently, the performance of vertex centric iteration is suboptimal in those > iterations where the workset is small, because the complexity of one > iteration contains the number of edges and vertices of the graph because of > coGroups: > VertexCentricIteration.buildMessagingFunction does a coGroup between the > edges and the workset, to get the neighbors to the messaging UDF. This is > problematic from a performance point of view, because the coGroup UDF gets > called on all the edge groups, including those that are not getting any > messages. > An analogous problem is present in > VertexCentricIteration.createResultSimpleVertex at the creation of the > updates: a coGroup happens between the messages and the solution set, which > has the number of vertices of the graph included in its complexity. > Both of these coGroups could be avoided by doing a join instead (with the > same keys that the coGroup uses), and then a groupBy. The complexity of these > operations would be dominated by the size of the workset, as opposed to the > number of edges or vertices of the graph. The joins should have the edges and > the solution set at the build side to achieve this complexity. (They will not > be rebuilt at every iteration.) > I made some experiments with this, and the initial results seem promising. On > some workloads, this achieves a 2 times speedup, because later iterations > often have quite small worksets, and these get a huge speedup from this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set
[ https://issues.apache.org/jira/browse/FLINK-2527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14698399#comment-14698399 ] Vasia Kalavri commented on FLINK-2527: -- I think (3) would break the model semantics. I'm leaning towards (1). [~ggevay] do you have any case in mind that (2) would allow to implement but (1) wouldn't? > If a VertexUpdateFunction calls setNewVertexValue more than once, the > MessagingFunction will only see the first value set > - > > Key: FLINK-2527 > URL: https://issues.apache.org/jira/browse/FLINK-2527 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: Gabor Gevay >Assignee: Gabor Gevay > Fix For: 0.10, 0.9.1 > > > The problem is that if setNewVertexValue is called more than once, it sends > each new value to the out Collector, and these all end up in the workset, but > then the coGroups in the two descendants of MessagingUdfWithEdgeValues use > only the first value in the state Iterable. I see three ways to resolve this: > 1. Add it to the documentation that setNewVertexValue should only be called > once, and optionally add a check for this. > 2. In setNewVertexValue, do not send the newValue to the out Collector at > once, but only record it in outVal, and send the last recorded value after > updateVertex returns. > 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup > and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need > some documentation addition.) > I like 2. the best. What are your opinions? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API v2
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1004#issuecomment-131169939 +1 from me too. Thanks for your great work @PieterJanVanAeken! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-Gelly] [example] added missing assumpti...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/883#issuecomment-130623223 I've actually included this in #1000. Could you close this PR @samk3211? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2451] [gelly] examples and library clea...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1000#discussion_r36963490 --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java --- @@ -86,26 +72,35 @@ public void testConnectedComponents() throws Exception { @Test public void testSingleSourceShortestPaths() throws Exception { - GSASingleSourceShortestPaths.main(new String[]{"1", edgesPath, resultPath, "16"}); - expectedResult = "1 0.0\n" + - "2 12.0\n" + - "3 13.0\n" + - "4 47.0\n" + - "5 48.0\n" + - "6 Infinity\n" + - "7 Infinity\n"; + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph inputGraph = Graph.fromDataSet( + SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), + new InitMapperSSSP(), env); + +List> result = inputGraph.run(new GSASingleSourceShortestPaths(1l, 16)) + .getVertices().collect(); --- End diff -- Yes, we do! The idea is to lazily mitigate the rest of the tests, too. Take a look at FLINK-2032. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-1528) Add local clustering coefficient library method and example
[ https://issues.apache.org/jira/browse/FLINK-1528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri updated FLINK-1528: - Description: Add a gelly library method to compute the local clustering coefficient. (was: Add a gelly library method and example to compute the local clustering coefficient.) > Add local clustering coefficient library method and example > --- > > Key: FLINK-1528 > URL: https://issues.apache.org/jira/browse/FLINK-1528 > Project: Flink > Issue Type: Task > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > Add a gelly library method to compute the local clustering coefficient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-1528) Add local clustering coefficient library method and example
[ https://issues.apache.org/jira/browse/FLINK-1528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri reassigned FLINK-1528: Assignee: Vasia Kalavri (was: Daniel Bali) > Add local clustering coefficient library method and example > --- > > Key: FLINK-1528 > URL: https://issues.apache.org/jira/browse/FLINK-1528 > Project: Flink > Issue Type: Task > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > Add a gelly library method and example to compute the local clustering > coefficient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-1707) Add an Affinity Propagation Library Method
[ https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri updated FLINK-1707: - Assignee: (was: joey) > Add an Affinity Propagation Library Method > -- > > Key: FLINK-1707 > URL: https://issues.apache.org/jira/browse/FLINK-1707 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Vasia Kalavri >Priority: Minor > > This issue proposes adding the an implementation of the Affinity Propagation > algorithm as a Gelly library method and a corresponding example. > The algorithm is described in paper [1] and a description of a vertex-centric > implementation can be found is [2]. > [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf > [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1707][WIP]Add an Affinity Propagation L...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/649#issuecomment-129226226 Thanks for the fast response and your kind words :) Glad to have collaborated with you, too! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1528][Gelly] Added Local Clustering Coe...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/420#issuecomment-129226158 Thats' fine :) Thanks for the fast response! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1707][WIP]Add an Affinity Propagation L...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/649#issuecomment-129223427 Hi @joey001, any news regarding this PR? If you have no time to work on this, that's fine! You can close the PR and someone will hopefully pick up the issue. Just let us know :) Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1885] [gelly] Added bulk execution mode...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/598#issuecomment-129223234 Hi @markus-h, I'm so sorry it took me so long to look into this.. I agree with Stephan's comment and also it would be great if we could add this option to gather-sum-apply, too. Would you like to try to rebase? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1528][Gelly] Added Local Clustering Coe...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/420#issuecomment-129222936 @balidani, I think it'd be better if you close this PR. I don't think we'll add another example after #1000 is merged. I can take over and probably reuse some of your code to add a local clustering coefficient library method. Would that be OK? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2451] [gelly] examples and library clea...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1000#issuecomment-129175273 ha! I hadn't noticed! That's nice :)) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/808#issuecomment-128681998 I think it's a good idea and it will probably save you time :) Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2451] [gelly] examples and library clea...
GitHub user vasia opened a pull request: https://github.com/apache/flink/pull/1000 [FLINK-2451] [gelly] examples and library cleanup This PR contains a refactoring and cleanup of the gelly exmaples and library methods, as discussed in the mailing list. Tests have been adjusted accordingly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vasia/flink examples-cleanup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1000.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1000 commit ce0119a059b3acd0a75c01f53e484664a0dc5541 Author: vasia Date: 2015-08-03T12:37:19Z [FLINK-2451] [gelly] removed redundant examples; added comments describing which gelly method each example illustrates. commit 721137ede022aafa434f210ccd65ac05c372c68e Author: vasia Date: 2015-08-03T13:20:44Z [FLINK-2451] [gelly] library methods cleanup commit 2a90178b7d629cceb46a0431ef39cb9fd91b7c41 Author: vasia Date: 2015-08-07T09:28:20Z [FLINK-2451] [gelly] re-organized tests; compare with collect() instead of temp files where possible --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-2452) Add a playcount threshold to the MusicProfiles example
[ https://issues.apache.org/jira/browse/FLINK-2452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri resolved FLINK-2452. -- Resolution: Fixed > Add a playcount threshold to the MusicProfiles example > -- > > Key: FLINK-2452 > URL: https://issues.apache.org/jira/browse/FLINK-2452 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 0.10 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri >Priority: Minor > Fix For: 0.10 > > > In the MusicProfiles example, when creating the user-user similarity graph, > an edge is created between any 2 users that have listened to the same song > (even if once). Depending on the input data, this might produce a projection > graph with many more edges than the original user-song graph. > To make this computation more efficient, this issue proposes adding a > user-defined parameter that filters out songs that a user has listened to > only a few times. Essentially, it is a threshold for playcount, above which a > user is considered to like a song. > For reference, with a threshold value of 30, the whole Last.fm dataset is > analyzed on my laptop in a few minutes, while no threshold results in a > runtime of several hours. > There are many solutions to this problem, but since this is just an example > (not a library method), I think that keeping it simple is important. > Thanks to [~andralungu] for spotting the inefficiency! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2452) Add a playcount threshold to the MusicProfiles example
[ https://issues.apache.org/jira/browse/FLINK-2452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri updated FLINK-2452: - Fix Version/s: 0.10 > Add a playcount threshold to the MusicProfiles example > -- > > Key: FLINK-2452 > URL: https://issues.apache.org/jira/browse/FLINK-2452 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 0.10 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri >Priority: Minor > Fix For: 0.10 > > > In the MusicProfiles example, when creating the user-user similarity graph, > an edge is created between any 2 users that have listened to the same song > (even if once). Depending on the input data, this might produce a projection > graph with many more edges than the original user-song graph. > To make this computation more efficient, this issue proposes adding a > user-defined parameter that filters out songs that a user has listened to > only a few times. Essentially, it is a threshold for playcount, above which a > user is considered to like a song. > For reference, with a threshold value of 30, the whole Last.fm dataset is > analyzed on my laptop in a few minutes, while no threshold results in a > runtime of several hours. > There are many solutions to this problem, but since this is just an example > (not a library method), I think that keeping it simple is important. > Thanks to [~andralungu] for spotting the inefficiency! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2240] Use BloomFilter to filter probe r...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/888#issuecomment-128620294 Makes sense. Thank you both! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2240] Use BloomFilter to filter probe r...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/888#issuecomment-128445757 Hi, this looks great indeed! Just out of curiosity, why did you write your own bloom filter implementation and not use a ready one, e.g. from guava? I'm wondering because in #923 we also want to use a bloom filter for an approximate algorithm implementation. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/808#issuecomment-128420592 Hi @PieterJanVanAeken! Thanks for the update. It seems something went wrong with your merge. Your last commit shows 1000+ files changed... Could you try to rebase instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2452] [Gelly] adds a playcount threshol...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/968#issuecomment-128413716 Thanks @fhueske! Any comment @andralungu? Otherwise, I'll merge this :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2452] [Gelly] adds a playcount threshol...
GitHub user vasia opened a pull request: https://github.com/apache/flink/pull/968 [FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example This PR adds a user-defined parameter to the MusicProfiles example that filters out songs that a user has listened to only a few times. Essentially, it is a threshold for playcount, above which a user is considered to like a song. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vasia/flink music-profiles Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/968.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #968 commit c0c8463521912d021c392c2c5edc254fee267eb8 Author: vasia Date: 2015-07-31T20:12:18Z [FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-2452) Add a playcount threshold to the MusicProfiles example
[ https://issues.apache.org/jira/browse/FLINK-2452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri updated FLINK-2452: - Summary: Add a playcount threshold to the MusicProfiles example (was: Add a playcount threshold to the MusicProfiles examples) > Add a playcount threshold to the MusicProfiles example > -- > > Key: FLINK-2452 > URL: https://issues.apache.org/jira/browse/FLINK-2452 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 0.10 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri >Priority: Minor > > In the MusicProfiles example, when creating the user-user similarity graph, > an edge is created between any 2 users that have listened to the same song > (even if once). Depending on the input data, this might produce a projection > graph with many more edges than the original user-song graph. > To make this computation more efficient, this issue proposes adding a > user-defined parameter that filters out songs that a user has listened to > only a few times. Essentially, it is a threshold for playcount, above which a > user is considered to like a song. > For reference, with a threshold value of 30, the whole Last.fm dataset is > analyzed on my laptop in a few minutes, while no threshold results in a > runtime of several hours. > There are many solutions to this problem, but since this is just an example > (not a library method), I think that keeping it simple is important. > Thanks to [~andralungu] for spotting the inefficiency! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2452) Add a playcount threshold to the MusicProfiles examples
Vasia Kalavri created FLINK-2452: Summary: Add a playcount threshold to the MusicProfiles examples Key: FLINK-2452 URL: https://issues.apache.org/jira/browse/FLINK-2452 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.10 Reporter: Vasia Kalavri Assignee: Vasia Kalavri Priority: Minor In the MusicProfiles example, when creating the user-user similarity graph, an edge is created between any 2 users that have listened to the same song (even if once). Depending on the input data, this might produce a projection graph with many more edges than the original user-song graph. To make this computation more efficient, this issue proposes adding a user-defined parameter that filters out songs that a user has listened to only a few times. Essentially, it is a threshold for playcount, above which a user is considered to like a song. For reference, with a threshold value of 30, the whole Last.fm dataset is analyzed on my laptop in a few minutes, while no threshold results in a runtime of several hours. There are many solutions to this problem, but since this is just an example (not a library method), I think that keeping it simple is important. Thanks to [~andralungu] for spotting the inefficiency! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2451) Cleanup Gelly examples
Vasia Kalavri created FLINK-2451: Summary: Cleanup Gelly examples Key: FLINK-2451 URL: https://issues.apache.org/jira/browse/FLINK-2451 Project: Flink Issue Type: Improvement Components: Gelly Affects Versions: 0.10 Reporter: Vasia Kalavri Assignee: Vasia Kalavri Priority: Minor As per discussion in the dev@ mailing list, this issue proposes the following changes to the Gelly examples and library: 1. Keep the following examples as they are: EuclideanGraphWeighing, GraphMetrics, IncrementalSSSP, JaccardSimilarity, MusicProfiles. 2. Keep only 1 example to show how to use library methods. 3. Add 1 example for vertex-centric iterations. 4. Keep 1 example for GSA iterations and move the redundant GSA implementations to the library. 5. Improve the examples documentation and refer to the functionality that each of them demonstrates. 6. Port and modify existing example tests accordingly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2411) Add basic graph summarization algorithm
[ https://issues.apache.org/jira/browse/FLINK-2411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri updated FLINK-2411: - Assignee: Martin Junghanns > Add basic graph summarization algorithm > --- > > Key: FLINK-2411 > URL: https://issues.apache.org/jira/browse/FLINK-2411 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: master >Reporter: Martin Junghanns >Assignee: Martin Junghanns >Priority: Minor > > Graph summarization determines a structural grouping of similar vertices and > edges to condense a graph and thus helps to uncover insights about patterns > hidden in the graph. It can be used in OLAP-style operations on the graph and > is similar to group by in SQL but on the graph structure instead of rows. > > The graph summarization operator represents every vertex group by a single > vertex in the summarized graph; edges between vertices in the summary graph > represent a group of edges between the vertex group members of the original > graph. Summarization is defined by specifying grouping keys for vertices and > edges, respectively. > One publication that presents a Map/Reduce based approach is "Pagrol: > Parallel graph olap over large-scale attributed graphs", however they > pre-compute the graph-cube before it can be analyzed. With Flink, we can give > the user an interactive way of summarizing the graph and do not need to > compute the cube beforehand. > A more complex approach focuses on summarization on graph patterns > "SynopSys: Large Graph Analytics in the SAP HANA Database Through > Summarization". > However, I want to start with a simple algorithm that summarizes the graph on > vertex and optionally edge values and additionally stores a count aggregate > at summarized vertices/edges. > Consider the following two examples (e.g., social network with users from > cities and friendships with timestamp): > > h4. Input graph: > > Vertices (id, value): > (0, Leipzig) > (1, Leipzig) > (2, Dresden) > (3, Dresden) > (4, Dresden) > (5, Berlin) > Edges (source, target, value): > (0, 1, 2014) > (1, 0, 2014) > (1, 2, 2013) > (2, 1, 2013) > (2, 3, 2014) > (3, 2, 2014) > (4, 0, 2013) > (4, 1, 2015) > (5, 2, 2015) > (5, 3, 2015) > h4. Output graph (summarized on vertex value): > Vertices (id, value, count) > (0, Leipzig, 2) // "2 users from Leipzig" > (2, Dresden, 3) // "3 users from Dresden" > (5, Berlin, 1) // "1 user from Berlin" > Edges (source, target, count) > (0, 0, 2) // "2 edges between users in Leipzig" > (0, 2, 1) // "1 edge from users in Leipzig to users in Dresden" > (2, 0, 3) // "3 edges from users in Dresden to users in Leipzig" > (2, 2, 2) // "2 edges between users in Dresden" > (5, 2, 2) // "2 edges from users in Berlin to users in Dresden" > h4. Output graph (summarized on vertex and edge value): > Vertices (id, value, count) > (0, Leipzig, 2) > (2, Dresden, 3) > (5, Berlin, 1) > Edges (source, target, value, count) > (0, 0, 2014, 2) // ... > (0, 2, 2013, 1) // ... > (2, 0, 2013, 2) // "2 edges from users in Dresden to users in Leipzig with > timestamp 2013" > (2, 0, 2015, 1) // "1 edge from users in Dresden to users in Leipzig with > timestamp 2015" > (2, 2, 2014, 2) // ... > (5, 2, 2015, 2) // ... > I've already implemented two versions of the summarization algorithm in our > own project [Gradoop|https://github.com/dbs-leipzig/gradoop], which is a > graph analytics stack on top of Hadoop + Gelly/Flink with a fixed data model. > You can see the current WIP here: > 1 [Abstract > summarization|https://github.com/dbs-leipzig/gradoop/blob/%2345_gradoop_flink/gradoop-flink/src/main/java/org/gradoop/model/impl/operators/Summarization.java] > 2 [Implementation using > cross|https://github.com/dbs-leipzig/gradoop/blob/%2345_gradoop_flink/gradoop-flink/src/main/java/org/gradoop/model/impl/operators/SummarizationCross.java] > 3 [Implementation using > joins|https://github.com/dbs-leipzig/gradoop/blob/%2345_gradoop_flink/gradoop-flink/src/main/java/org/gradoop/model/impl/operators/SummarizationJoin.java] > 4 > [Tests|https://github.com/dbs-leipzig/gradoop/blob/%2345_gradoop_flink/gradoop-flink/src/test/java/org/gradoop/model/impl/EPGraphSummarizeTest.java] > 5 > [TestGraph|https://github.com/dbs-leipzig/gradoop/blob/%2345_gradoop_flink/dev-support/social-network.pdf] > I would basically use the same implementa
[jira] [Commented] (FLINK-2411) Add basic graph summarization algorithm
[ https://issues.apache.org/jira/browse/FLINK-2411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14643995#comment-14643995 ] Vasia Kalavri commented on FLINK-2411: -- Hi [~mju]! Thank you for the detailed description ^^ This would be a great addition to the Gelly library! I agree with you, let's start with a simple algorithm as you describe it. I'll assign this issue to you. Let us know if you have questions! > Add basic graph summarization algorithm > --- > > Key: FLINK-2411 > URL: https://issues.apache.org/jira/browse/FLINK-2411 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: master >Reporter: Martin Junghanns >Priority: Minor > > Graph summarization determines a structural grouping of similar vertices and > edges to condense a graph and thus helps to uncover insights about patterns > hidden in the graph. It can be used in OLAP-style operations on the graph and > is similar to group by in SQL but on the graph structure instead of rows. > > The graph summarization operator represents every vertex group by a single > vertex in the summarized graph; edges between vertices in the summary graph > represent a group of edges between the vertex group members of the original > graph. Summarization is defined by specifying grouping keys for vertices and > edges, respectively. > One publication that presents a Map/Reduce based approach is "Pagrol: > Parallel graph olap over large-scale attributed graphs", however they > pre-compute the graph-cube before it can be analyzed. With Flink, we can give > the user an interactive way of summarizing the graph and do not need to > compute the cube beforehand. > A more complex approach focuses on summarization on graph patterns > "SynopSys: Large Graph Analytics in the SAP HANA Database Through > Summarization". > However, I want to start with a simple algorithm that summarizes the graph on > vertex and optionally edge values and additionally stores a count aggregate > at summarized vertices/edges. > Consider the following two examples (e.g., social network with users from > cities and friendships with timestamp): > > h4. Input graph: > > Vertices (id, value): > (0, Leipzig) > (1, Leipzig) > (2, Dresden) > (3, Dresden) > (4, Dresden) > (5, Berlin) > Edges (source, target, value): > (0, 1, 2014) > (1, 0, 2014) > (1, 2, 2013) > (2, 1, 2013) > (2, 3, 2014) > (3, 2, 2014) > (4, 0, 2013) > (4, 1, 2015) > (5, 2, 2015) > (5, 3, 2015) > h4. Output graph (summarized on vertex value): > Vertices (id, value, count) > (0, Leipzig, 2) // "2 users from Leipzig" > (2, Dresden, 3) // "3 users from Dresden" > (5, Berlin, 1) // "1 user from Berlin" > Edges (source, target, count) > (0, 0, 2) // "2 edges between users in Leipzig" > (0, 2, 1) // "1 edge from users in Leipzig to users in Dresden" > (2, 0, 3) // "3 edges from users in Dresden to users in Leipzig" > (2, 2, 2) // "2 edges between users in Dresden" > (5, 2, 2) // "2 edges from users in Berlin to users in Dresden" > h4. Output graph (summarized on vertex and edge value): > Vertices (id, value, count) > (0, Leipzig, 2) > (2, Dresden, 3) > (5, Berlin, 1) > Edges (source, target, value, count) > (0, 0, 2014, 2) // ... > (0, 2, 2013, 1) // ... > (2, 0, 2013, 2) // "2 edges from users in Dresden to users in Leipzig with > timestamp 2013" > (2, 0, 2015, 1) // "1 edge from users in Dresden to users in Leipzig with > timestamp 2015" > (2, 2, 2014, 2) // ... > (5, 2, 2015, 2) // ... > I've already implemented two versions of the summarization algorithm in our > own project [Gradoop|https://github.com/dbs-leipzig/gradoop], which is a > graph analytics stack on top of Hadoop + Gelly/Flink with a fixed data model. > You can see the current WIP here: > 1 [Abstract > summarization|https://github.com/dbs-leipzig/gradoop/blob/%2345_gradoop_flink/gradoop-flink/src/main/java/org/gradoop/model/impl/operators/Summarization.java] > 2 [Implementation using > cross|https://github.com/dbs-leipzig/gradoop/blob/%2345_gradoop_flink/gradoop-flink/src/main/java/org/gradoop/model/impl/operators/SummarizationCross.java] > 3 [Implementation using > joins|https://github.com/dbs-leipzig/gradoop/blob/%2345_gradoop_flink/gradoop-flink/src/main/java/org/gradoop/model/impl/operators/SummarizationJoin.java] > 4 > [Tests|https://github.com/dbs-leipzig/gradoop/blob/%2345_grad
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-125225039 Hi @shghatge, @andralungu, I built on the current state of this PR and made the proposed changes above, together with some styling changes. You can see the result on my [local branch](https://github.com/vasia/flink/tree/csvInput). If you agree with the changes, I will also update the documentation and merge this. Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1815) Add methods to read and write a Graph as adjacency list
[ https://issues.apache.org/jira/browse/FLINK-1815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14642594#comment-14642594 ] Vasia Kalavri commented on FLINK-1815: -- Hey, unless I'm missing something, can't we just read the whole line of neighbors as text and do the splitting and edge emitting inside a flatMap? > Add methods to read and write a Graph as adjacency list > --- > > Key: FLINK-1815 > URL: https://issues.apache.org/jira/browse/FLINK-1815 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.9 >Reporter: Vasia Kalavri >Assignee: Faye Beligianni >Priority: Minor > > It would be nice to add utility methods to read a graph from an Adjacency > list format and also write a graph in such a format. > The simple case would be to read a graph with no vertex or edge values, where > we would need to define (a) a line delimiter, (b) a delimiter to separate > vertices from neighbor list and (c) and a delimiter to separate the neighbors. > For example, "1 2,3,4\n2 1,3" would give vertex 1 with neighbors 2, 3 and 4 > and vertex 2 with neighbors 1 and 3. > If we have vertex values and/or edge values, we also need to have a way to > separate IDs from values. For example, we could have "1 0.1 2 0.5, 3 0.2" to > define a vertex 1 with value 0.1, edge (1, 2) with weight 0.5 and edge (1, 3) > with weight 0.2. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/808#issuecomment-125132370 Perfect, thanks @PieterJanVanAeken! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2127) The GSA Documentation has trailing s
[ https://issues.apache.org/jira/browse/FLINK-2127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14642411#comment-14642411 ] Vasia Kalavri commented on FLINK-2127: -- Is this one still a problem? > The GSA Documentation has trailing s > - > > Key: FLINK-2127 > URL: https://issues.apache.org/jira/browse/FLINK-2127 > Project: Flink > Issue Type: Bug > Components: Documentation, Gelly >Affects Versions: 0.9 >Reporter: Andra Lungu >Assignee: Maximilian Michels >Priority: Minor > Fix For: 0.9 > > > Within the GSA Section of the documentation, there are trailing: class="text-center"> image . > It would be nice to remove them :) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2163) VertexCentricConfigurationITCase sometimes fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-2163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14642410#comment-14642410 ] Vasia Kalavri commented on FLINK-2163: -- I will mark this as resolved. We haven't seen any instance of this since we ported tests to using collect(). > VertexCentricConfigurationITCase sometimes fails on Travis > -- > > Key: FLINK-2163 > URL: https://issues.apache.org/jira/browse/FLINK-2163 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: Aljoscha Krettek > Fix For: 0.10 > > > This is the relevant output from the log: > {code} > testIterationINDirection[Execution mode = > CLUSTER](org.apache.flink.graph.test.VertexCentricConfigurationITCase) Time > elapsed: 0.587 sec <<< FAILURE! > java.lang.AssertionError: Different number of lines in expected and obtained > result. expected:<5> but was:<2> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:555) > at > org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:270) > at > org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:256) > at > org.apache.flink.graph.test.VertexCentricConfigurationITCase.after(VertexCentricConfigurationITCase.java:70) > Results : > Failed tests: > > VertexCentricConfigurationITCase.after:70->TestBaseUtils.compareResultsByLinesInMemory:256->TestBaseUtils.compareResultsByLinesInMemory:270 > Different number of lines in expected and obtained result. expected:<5> but > was:<2> > {code} > https://travis-ci.org/aljoscha/flink/jobs/65403502 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2163) VertexCentricConfigurationITCase sometimes fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-2163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri resolved FLINK-2163. -- Resolution: Fixed Fix Version/s: 0.10 > VertexCentricConfigurationITCase sometimes fails on Travis > -- > > Key: FLINK-2163 > URL: https://issues.apache.org/jira/browse/FLINK-2163 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: Aljoscha Krettek > Fix For: 0.10 > > > This is the relevant output from the log: > {code} > testIterationINDirection[Execution mode = > CLUSTER](org.apache.flink.graph.test.VertexCentricConfigurationITCase) Time > elapsed: 0.587 sec <<< FAILURE! > java.lang.AssertionError: Different number of lines in expected and obtained > result. expected:<5> but was:<2> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:555) > at > org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:270) > at > org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:256) > at > org.apache.flink.graph.test.VertexCentricConfigurationITCase.after(VertexCentricConfigurationITCase.java:70) > Results : > Failed tests: > > VertexCentricConfigurationITCase.after:70->TestBaseUtils.compareResultsByLinesInMemory:256->TestBaseUtils.compareResultsByLinesInMemory:270 > Different number of lines in expected and obtained result. expected:<5> but > was:<2> > {code} > https://travis-ci.org/aljoscha/flink/jobs/65403502 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/808#issuecomment-125112165 Hey @PieterJanVanAeken! Are you back working on this or would you rather someone else takes over? I think I can find some time next week to resolve remaining issues and bring this to "mergeable" state :) Just let me know! Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-123402206 I see your point @shghatge. However, I think naming just one method differently will be confusing.. If we're going to have custom method names, let's go with @andralungu's suggestion above and make sure we document these properly. I would prefer a bit shorter method names though. How about: 1). `keyType(K)` 2). `vertexTypes(K, VV)` 3). `edgeTypes(K, EV)` 4). `types(K, VV, EV)` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2375] Add Approximate Adamic Adar Simil...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/923#issuecomment-123394302 Hi @shghatge! The comments I left on #892 apply here as well. The difference would be that the neighborhoods of each of the neighbors will be represented as a bloom filter. It would also be nice to make the bloom filter parameters (size, number of hash functions, hash function) configurable, so that the use can adjust the false positives and size based on their use-case. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/892#issuecomment-123236061 Hi @shghatge, let me try to explain the implementation in detail here :) First of all, can you change this to a library method instead of an example? The modification should be very easy. You only need to move the class in the `org.apache.flink.graph.library` package and implement the `GraphAlgorithm` interface. This way, users will be able to use this method by simply calling `graph.run(new AdamicAdarSimilarity())`. Making this a library method also means that we don't have to use only Gelly methods, i.e. we can do a few things more efficiently. For example, in the beginning of the algorithm, you need to compute (1) the vertex "weights" and (2) the neighbor IDs for each vertex. Both these computations can be done with a single GroupReduce on the edges dataset, i.e. `edges.flatMap(...).groupBy(0).reduceGroup(...)`, where in the flatMap you simply create the opposite direction edges and in the reduceGroup you compute the neighborhood sets and degrees (size of the set) - weights. The result will be a dataset where each vertex has a `Tuple2` value with its "weight" as the first field and its neighbors as the second. Similarly, instead of using `getTriplets()` (which is convenient but quite expensive), you can compute the partial edge values with a single `groupReduceOnNeighbors`. Say you have vertices `(1, {d1, (2, 3, 4)})`, `(2, {d2, (1, 3)})`, `(3, {d3, (2, 4)})` and `(4, {d4, (1, 3)})`. In `groupReduceOnNeighbors`, vertex 1 will compute the following: - neighbor 2: common neighbor 3 -> emit `(2, 3, d1)` - neighbor 3: common neighbor 4 -> emit `(3, 4, d1)` Finally, you can `groupBy(0, 1)` this result dataset and compute the sums to get the final adamic-adar similarities. Let me know if this makes sense and whether you need more clarifications! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1943] [gelly] Added GSA compiler and tr...
GitHub user vasia opened a pull request: https://github.com/apache/flink/pull/916 [FLINK-1943] [gelly] Added GSA compiler and translation tests Thanks @StephanEwen for the help! You can merge this pull request into a Git repository by running: $ git pull https://github.com/vasia/flink flink-1943 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/916.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #916 commit 24b44b97adba63e8ccf65c0616bb06f9532634a9 Author: vasia Date: 2015-07-08T10:38:27Z [FLINK-1943] [gelly] added flink-optimizer to gelly pom; added GSA compiler and translation tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2361) flatMap + distinct gives erroneous results for big data sets
[ https://issues.apache.org/jira/browse/FLINK-2361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14627773#comment-14627773 ] Vasia Kalavri commented on FLINK-2361: -- Hey, I have seen this exception before and I think it was when debugging FLINK-1930. I don't think this is an operator correctness problem either. [~andralungu], I take it vertex 657282846 actually exists in the dataset and it's not garbage, right? As far as I remember, what seems to be happening is that the vertex dataset has not been fully generated when the message to this vertex ID is sent, i.e. some kind of blocking issue. Can you check if this problem is still there when you generate the vertex set separately, store it to disk and get input from edges and vertex files? > flatMap + distinct gives erroneous results for big data sets > > > Key: FLINK-2361 > URL: https://issues.apache.org/jira/browse/FLINK-2361 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 0.10 >Reporter: Andra Lungu > > When running the simple Connected Components algorithm (currently in Gelly) > on the twitter follower graph, with 1, 100 or 1 iterations, I get the > following error: > Caused by: java.lang.Exception: Target vertex '657282846' does not exist!. > at > org.apache.flink.graph.spargel.VertexCentricIteration$VertexUpdateUdfSimpleVV.coGroup(VertexCentricIteration.java:300) > at > org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:220) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) > at > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139) > at > org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:107) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:722) > Now this is very bizzare as the DataSet of vertices is produced from the > DataSet of edges... Which means there cannot be a an edge with an invalid > target id... The method calls flatMap to isolate the src and trg ids and > distinct to ensure their uniqueness. > The algorithm works fine for smaller data sets... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-121215586 yes, I mean `NullValue.class` :) I'd like to know @shghatge's opinion, too! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-121072347 Still an overkill I think... Could another way be to have only `types(K, VV, EV)` with all 3 arguments and expect `NullValue` if a value is missing? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-121033796 I hadn't realized that they would both need to be called in my previous comment, my bad. Any idea for decent method names? `typesNoEdgeValue` and `typesNoVertexValue` seem really ugly to me :S --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-121025189 Hi @andralungu, do you mean support for POJOs as vertex / edge values? I guess that's a limitation we can't easily overcome, I agree. Still though, a nicely designed `fromCsv()` method would simplify the common case. As for the examples, I don't like what they currently look like in this PR either. However, that's not a problem of `fromCsv()`. The if-block can be easily simplified by changing `getDefaultEdgeDataSet` to `getDefaultGraph`. The else-block looks longer because of the mapper, which, in the current examples is in the main method. What I think is quite problematic, is the `types()` methods. Ideally, we would have the following: 1. `types(K)` : no vertex value, no edge value 2. `types(K, VV)`: no edge value 3. `types(K, EV)`: no vertex value 4. `types(K, VV, EV)`: both vertex and edge values are present However, because of type erasure, we can't have both 2 and 3. The current implementation (having separate `typesEdges` and `typesVertices`) means that both should always be called, even if not necessary. Another way would be to give 2 and 3 different names... So far I haven't been able to come up with a nice solution. Ideas? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/892#issuecomment-120404920 You can use this PR for the exact computation, no need to open a new one! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/892#issuecomment-120319905 Hi @shghatge! I agree, let's deal with the approximate version as a separate issue. In the end though, it would be nice to have a single library method and an input parameter to decide whether the computation should be exact or approximate. Regarding the bloom filter, the idea is for each vertex to build a bloom filter with its neighbors and "send" it to its neighbors. Then, each vertex can compare its own neighborhood (the exact one) with the received bloom filter neighborhoods. Take a look at how approximate Jaccard is computed in the okapi library [here](https://github.com/grafos-ml/okapi/blob/master/src/main/java/ml/grafos/okapi/graphs/similarity/Jaccard.java) (class `JaccardApproximation `). Let me know if you have more questions :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/892#issuecomment-119750756 Hi @shghatge, thank you for working on this! It's not a trivial problem :) I have two general comments: - I would prefer to see this as library method instead of an example. As an example, it shows how to use neighborhood methods, joinWithVertices and getTriplets, but we already have the JaccardSimilarity for all of these. - The algorithm is actually very hard to scale. It essentially requires common neighborhood calculation. This is a real pain for large graphs with skewed vertices. @andralungu can explain to you more, since she's been working on optimizing similar problems for her work. An additional problem that this particular implementation has is that it also stores and sends vertex values, not only vertex IDs. If you try to run this with a big graph, it will probably crush or run forever. There are 2 things we could do: 1). Only send a list of vertex IDs and let the receiving vertex generate partial sums for each common neighbor. At the end, we can aggregate the sums per edge. For example, consider the edges (1, 2), (1, 3), (2, 3), (1, 4) and (2, 4). In the neighborhood method, vertex 3 will find that it has a common neighbors with 1, vertex 2. Then, it will emit a tuple `<1, 2, 1/log(d3)>`. In the same way, vertex 4 will find that it has a common neighbor with 1, vertex 2 and it will emit a tuple `<1, 2, 1/log(d4)>`. The output of the neighborhood method will be a dataset of tuples that represent edges with partial sums. You can then compute the Adamic-Adar similarity by summing up the values per vertex. 2). The above solution will be OK for regular graphs, but will still have problems in the presence of skew. Another approach would be to offer an approximate Adamic-Adar similarity, by representing neighborhoods as bloom filters. In this case, the size of common neighborhoods will be over-estimated, but the computation will be faster. @shghatge, @andralungu, let me know what you think and how you would like to proceed! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/892#discussion_r34204894 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/AdamicAdarSimilarityMeasure.java --- @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.ReduceNeighborsFunction; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData; +import java.util.HashSet; + +/** + * Given a directed, unweighted graph, return a weighted graph where the edge values are equal + * to the Adamic Acard similarity coefficient which is given as + * Summation of weights of common neighbors of the source and destination vertex --- End diff -- we usually use the term "weight" for an edge and "value" for a vertex --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/892#discussion_r34204825 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/AdamicAdarSimilarityMeasure.java --- @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.ReduceNeighborsFunction; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData; +import java.util.HashSet; + +/** + * Given a directed, unweighted graph, return a weighted graph where the edge values are equal + * to the Adamic Acard similarity coefficient which is given as --- End diff -- Acard -> Adar --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Hits
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/765#issuecomment-119744587 Hi @mfahimazizi! Thank you for the update. It seems Travis is still complaining about checkstyle. Could you check again please? You can see these errors locally as well, if you run `mvn verify` inside `flink-gelly`. You are right that inside the `VertexUpdateFunction` you cannot access the edge values. However, you could add the edge value inside the message that you create in the `MessagingFunction` (you can access the edges there with `getEdges()`. Would that work for you? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-1943) Add Gelly-GSA compiler and translation tests
[ https://issues.apache.org/jira/browse/FLINK-1943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri reassigned FLINK-1943: Assignee: Vasia Kalavri > Add Gelly-GSA compiler and translation tests > > > Key: FLINK-1943 > URL: https://issues.apache.org/jira/browse/FLINK-1943 > Project: Flink > Issue Type: Test > Components: Gelly >Affects Versions: 0.9 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > These should be similar to the corresponding Spargel tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/877#issuecomment-118912040 Hi @shghatge, @andralungu! I left one comment that I think is quite serious. Apart from that, I had also left a minor comment on the gelly-guide changes in the last review. Once these are fixed, you have my +1 :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/877#discussion_r33951120 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java --- @@ -367,6 +394,20 @@ public void join(Vertex vertex, Edge edge, Collectorf0") --- End diff -- I think this annotation is wrong. It's the first field that's forwarded (the edge source). If that's the case, can you please investigate why your tests don't catch this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-Gelly] [example] added missing assumpti...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/883#issuecomment-118181248 Hi @samk3211! Thank you for spotting this :) Can you please (1) close #881, (2) add this comment also to GSAPageRank as @andralungu (3) squash your commits to a single one? Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/877#issuecomment-118180990 In my view it's a straight-forward configuration option. If the user wants to propagate changes in this direction, they set the parameter. Also, I'm seeing that Gelly is starting to have way too many examples. Examples are there to show basic functionality, no every single feature of the API. Library methods are there to make common analysis easy to run and docs are there to help with configuration options and customization. i.e. we should focus on library and docs and only add examples to demonstrate major new additions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/877#issuecomment-118179278 Hi @shghatge! This looks good in general. Apart from my small inline comment, I have an objection regarding the added example. If I understand correctly, this example tries to compute all vertices that are reachable from each vertex. Is that correct? This is a computation that will cause the state to explode really fast, even for moderately large graphs. I would suggest that you remove the example completely. The docs should be enough to show the available functionality :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/877#discussion_r33826389 --- Diff: docs/libs/gelly_guide.md --- @@ -734,6 +737,24 @@ public static final class Apply { {% endhighlight %} +The following example illustrates the usage of the edge direction option. Vertices update their values to contain a list of all their in-neighbors. --- End diff -- What do you mean by "contain a list of all their in-neighbors"? Were you planning to add an example where vertices assign their neighborhoods as their values? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-118173012 Hi @shghatge! Thank you for the update :) I left some comments inline. There are still some formatting issues in the code. Please, carefully go through your changes and try to be consistent. Also, there are still several warning regarding types, unused annotations, unused variables. Can you please try to remove them? Your IDE should have a setting that gives you the list of warnings. Regarding the tests, better create new test files for your methods, since you need to test with files and currently other tests use `collect()`. Finally, I find the `types()` methods a bit confusing. Could we maybe have separate types methods for the vertices and edges? e.g. `typesEdges(keyType, valueType)`, `typesEdges(keyType)`, `typesVertices(keyType, valueType)` and `typesVertices(keyType)`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33823849 --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java --- @@ -52,16 +72,13 @@ public void testWithDoubleValueMapper() throws Exception { Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), new AssignDoubleValueMapper(), env); -DataSet> data = graph.getVertices(); -List> result= data.collect(); - + graph.getVertices().writeAsCsv(resultPath); --- End diff -- Same here.. We changed the tests to use `collect()` instead of files in #863. Please don't change it back ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33823666 --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java --- @@ -54,16 +75,13 @@ public void testCreateWithoutVertexValues() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env); -DataSet> data = graph.getVertices(); -List> result= data.collect(); - + graph.getVertices().writeAsCsv(resultPath); --- End diff -- hmm it seems you're reverting the changes of #863? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33822653 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.types.NullValue; +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ +public class GraphCsvReader { + + private final Path vertexPath,edgePath; + private final ExecutionEnvironment executionContext; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +// + + public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) { + this.vertexPath = vertexPath; + this.edgePath = edgePath; + this.VertexReader = new CsvReader(vertexPath,context); + this.EdgeReader = new CsvReader(edgePath,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path edgePath, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path edgePath,final MapFunction mapper, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String edgePath,ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")),new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String edgePath, final MapFunction mapper, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context); + } + + public CsvReader getEdgeReader() { + return this.EdgeReader; + } + + public CsvReader getVertexReader() { + return this.VertexReader; + } + // + + /** +* Specifies the types for
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33822502 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.types.NullValue; +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ +public class GraphCsvReader { + + private final Path vertexPath,edgePath; + private final ExecutionEnvironment executionContext; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +// + + public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) { + this.vertexPath = vertexPath; + this.edgePath = edgePath; + this.VertexReader = new CsvReader(vertexPath,context); + this.EdgeReader = new CsvReader(edgePath,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path edgePath, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path edgePath,final MapFunction mapper, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String edgePath,ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")),new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String edgePath, final MapFunction mapper, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context); + } + + public CsvReader getEdgeReader() { + return this.EdgeReader; + } + + public CsvReader getVertexReader() { + return this.VertexReader; + } + // + + /** +* Specifies the types for
[GitHub] flink pull request: [FLINK-2150] Added zipWithUniqueIds
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/801#issuecomment-118164049 Great work @andralungu, +1 from me! Make sure you squash your commits and appropriately update the commit message before merging ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2150] Added zipWithUniqueIds
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/801#discussion_r33820633 --- Diff: docs/apis/zip_elements_guide.md --- @@ -63,4 +63,44 @@ env.execute() will yield the tuples: (0,A), (1,B), (2,C), (3,D), (4,E), (5,F) +[Back to top](#top) + +### Zip with an Unique Identifier +In many cases, one may not need to assign consecutive labels. +`zipWIthUniqueId` works in a pipelined fashion, speeding up the label assignment process. This method receives a data set as input and returns a new data set of unique id, initial value tuples. +For example, the following code: + + + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); +DataSet in = env.fromElements("A", "B", "C", "D", "E", "F"); + +DataSet> result = DataSetUtils.zipWithUniqueId(in); + +result.writeAsCsv(resultPath, "\n", ","); +env.execute(); +{% endhighlight %} + + + +{% highlight scala %} +import org.apache.flink.api.scala._ + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F") + +val result: DataSet[(Long, String)] = input.zipWithUniqueId + +result.writeAsCsv(resultPath, "\n", ",") +env.execute() +{% endhighlight %} + + + + +will yield the tuples: (0,A), (2,B), (4,C), (6,D), (8,E), (10,F) --- End diff -- is the result deterministic? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Hits
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/765#issuecomment-118161158 Hi there! What is the status of this PR? It seems that travis is failing with checkstyle violations related to using spaces instead of tabs. Let us know if you need further help! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-2271) PageRank gives wrong results with weighted graph input
[ https://issues.apache.org/jira/browse/FLINK-2271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri resolved FLINK-2271. -- Resolution: Fixed > PageRank gives wrong results with weighted graph input > -- > > Key: FLINK-2271 > URL: https://issues.apache.org/jira/browse/FLINK-2271 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 0.9 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > Fix For: 0.10 > > > The current implementation of the PageRank algorithm expects a weighted edge > list as input. However, if the edge weight is other than 1.0, this will > result in wrong results. > We should change the library method and corresponding examples (also > GSAPageRank) to expect an unweighted graph and compute the transition > probabilities correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2163) VertexCentricConfigurationITCase sometimes fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-2163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14602000#comment-14602000 ] Vasia Kalavri commented on FLINK-2163: -- This test has now been changed to use collect(). Can we assume that this issue is now resolved? > VertexCentricConfigurationITCase sometimes fails on Travis > -- > > Key: FLINK-2163 > URL: https://issues.apache.org/jira/browse/FLINK-2163 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: Aljoscha Krettek > > This is the relevant output from the log: > {code} > testIterationINDirection[Execution mode = > CLUSTER](org.apache.flink.graph.test.VertexCentricConfigurationITCase) Time > elapsed: 0.587 sec <<< FAILURE! > java.lang.AssertionError: Different number of lines in expected and obtained > result. expected:<5> but was:<2> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:743) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:555) > at > org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:270) > at > org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory(TestBaseUtils.java:256) > at > org.apache.flink.graph.test.VertexCentricConfigurationITCase.after(VertexCentricConfigurationITCase.java:70) > Results : > Failed tests: > > VertexCentricConfigurationITCase.after:70->TestBaseUtils.compareResultsByLinesInMemory:256->TestBaseUtils.compareResultsByLinesInMemory:270 > Different number of lines in expected and obtained result. expected:<5> but > was:<2> > {code} > https://travis-ci.org/aljoscha/flink/jobs/65403502 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1522) Add tests for the library methods and examples
[ https://issues.apache.org/jira/browse/FLINK-1522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri resolved FLINK-1522. -- Resolution: Fixed Fix Version/s: 0.10 > Add tests for the library methods and examples > -- > > Key: FLINK-1522 > URL: https://issues.apache.org/jira/browse/FLINK-1522 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > Labels: easyfix, test > Fix For: 0.10 > > > The current tests in gelly test one method at a time. We should have some > tests for complete applications. As a start, we could add one test case per > example and this way also make sure that our graph library methods actually > give correct results. > I'm assigning this to [~andralungu] because she has already implemented the > test for SSSP, but I will help as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2093) Add a difference method to Gelly's Graph class
[ https://issues.apache.org/jira/browse/FLINK-2093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri resolved FLINK-2093. -- Resolution: Implemented Fix Version/s: 0.10 Congrats on your first contribution [~shivani94] :)) > Add a difference method to Gelly's Graph class > -- > > Key: FLINK-2093 > URL: https://issues.apache.org/jira/browse/FLINK-2093 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 0.9 >Reporter: Andra Lungu >Assignee: Shivani Ghatge >Priority: Minor > Fix For: 0.10 > > > This method will compute the difference between two graphs, returning a new > graph containing the vertices and edges that the current graph and the input > graph don't have in common. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2264) Migrate integration tests for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri resolved FLINK-2264. -- Resolution: Fixed Fix Version/s: 0.10 Congrats on your first contribution [~Samia]! > Migrate integration tests for Gelly > --- > > Key: FLINK-2264 > URL: https://issues.apache.org/jira/browse/FLINK-2264 > Project: Flink > Issue Type: Sub-task > Components: Gelly, Tests >Affects Versions: 0.10 >Reporter: Vasia Kalavri >Assignee: Samia Khalid >Priority: Minor > Fix For: 0.10 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2271] [FLINK-1522] [gelly] add missing ...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/865#issuecomment-115252595 Thank you for looking at this @mxm :) I'll merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/818#issuecomment-115215460 Thank you @shghatge! I'll merge this :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2264] [gelly] changed the tests to use ...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/863#issuecomment-115194215 Thank you @samk3211! This looks good :) I see that like here, @mjsax has also created a utils class for the new comparison methods in #866. Since all migrated tests will be using these methods, I will just move them to `TestBaseUtils` before merging this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2271] [FLINK-1522] [gelly] add missing ...
GitHub user vasia opened a pull request: https://github.com/apache/flink/pull/865 [FLINK-2271] [FLINK-1522] [gelly] add missing example tests This PR adds missing Gelly tests for PageRank and MusicProfiles examples. It also changes both vertex-centric and GSA PageRank examples to expect an unweighted edge list as input and to properly assign the transition probabilities as edge weights. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vasia/flink gelly-example-tests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/865.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #865 commit ac6c8ac63d730fd03125b3798778e76267c59dcb Author: vasia Date: 2015-06-24T16:09:13Z [FLINK-2271] [FLINK-1522] [gelly] changed PageRank example to expect an unweighted edge list added PageRank and MusicProfiles tests got rid of unused suppress warning annotations in Graph and JaccardSimilarityMeasure changed GSAPageRank example to expect an unweighted edge list --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-2271) PageRank gives wrong results with weighted graph input
[ https://issues.apache.org/jira/browse/FLINK-2271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri reassigned FLINK-2271: Assignee: Vasia Kalavri > PageRank gives wrong results with weighted graph input > -- > > Key: FLINK-2271 > URL: https://issues.apache.org/jira/browse/FLINK-2271 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 0.9 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > Fix For: 0.10 > > > The current implementation of the PageRank algorithm expects a weighted edge > list as input. However, if the edge weight is other than 1.0, this will > result in wrong results. > We should change the library method and corresponding examples (also > GSAPageRank) to expect an unweighted graph and compute the transition > probabilities correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2271) PageRank gives wrong results with weighted graph input
[ https://issues.apache.org/jira/browse/FLINK-2271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri updated FLINK-2271: - Affects Version/s: (was: 0.9.1) (was: 0.10) 0.9 > PageRank gives wrong results with weighted graph input > -- > > Key: FLINK-2271 > URL: https://issues.apache.org/jira/browse/FLINK-2271 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 0.9 >Reporter: Vasia Kalavri > > The current implementation of the PageRank algorithm expects a weighted edge > list as input. However, if the edge weight is other than 1.0, this will > result in wrong results. > We should change the library method and corresponding examples (also > GSAPageRank) to expect an unweighted graph and compute the transition > probabilities correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2271) PageRank gives wrong results with weighted graph input
Vasia Kalavri created FLINK-2271: Summary: PageRank gives wrong results with weighted graph input Key: FLINK-2271 URL: https://issues.apache.org/jira/browse/FLINK-2271 Project: Flink Issue Type: Bug Components: Gelly Affects Versions: 0.10, 0.9.1 Reporter: Vasia Kalavri The current implementation of the PageRank algorithm expects a weighted edge list as input. However, if the edge weight is other than 1.0, this will result in wrong results. We should change the library method and corresponding examples (also GSAPageRank) to expect an unweighted graph and compute the transition probabilities correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2264) Migrate integration tests for Gelly
[ https://issues.apache.org/jira/browse/FLINK-2264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vasia Kalavri updated FLINK-2264: - Assignee: Samia Khalid > Migrate integration tests for Gelly > --- > > Key: FLINK-2264 > URL: https://issues.apache.org/jira/browse/FLINK-2264 > Project: Flink > Issue Type: Sub-task > Components: Gelly, Tests >Affects Versions: 0.10 >Reporter: Vasia Kalavri >Assignee: Samia Khalid >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2264) Migrate integration tests for Gelly
Vasia Kalavri created FLINK-2264: Summary: Migrate integration tests for Gelly Key: FLINK-2264 URL: https://issues.apache.org/jira/browse/FLINK-2264 Project: Flink Issue Type: Sub-task Components: Gelly, Tests Affects Versions: 0.10 Reporter: Vasia Kalavri Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/808#issuecomment-114426618 Oh, one more thing :) In order to make this work in Eclipse, I had to add a few lines to the pom.xml. Basically, I copied the `` block from the streaming-scala project. I'm not sure whether everything in there is needed, so if someone knows better, please let us know ;) Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/818#issuecomment-114430160 Hi @shghatge! Thank you for the quick update. Apart from my minor comment, this looks good now :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/818#discussion_r33026720 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java --- @@ -1234,6 +1248,17 @@ public void coGroup(Iterable> edge, Iterable> edgeToBeRe } /** +* Performs Difference on the vertex and edge sets of the input graphs +* removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed +* @param graph the graph to perform difference with +* @return a new graph where the common vertices and edges have been removed +*/ + public Graph difference(Graph graph) throws java.lang.Exception{ --- End diff -- why the `throws` declaration? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/808#issuecomment-114422736 Hey, I also checked this over the weekend! (and realized how rusty my Scala has gotten :-S) I agree with both of you on the scaladocs, docs and formatting issues. There is some functionality missing, but since the Java API is also evolving, we should probably stop at some point and add the missing methods as separate issues. It would also be nice to have 1-2 examples implemented with the Scala API and then we can work on adding the rest at a later point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2032) Migrate integration tests from temp output files to collect()
[ https://issues.apache.org/jira/browse/FLINK-2032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14597403#comment-14597403 ] Vasia Kalavri commented on FLINK-2032: -- Hi there, I am working with a new contributor and she's interested in working on this issue. We think we can start by porting Gelly tests first. Shall I create a new issue and make it a sub-task of this one? Thanks! > Migrate integration tests from temp output files to collect() > - > > Key: FLINK-2032 > URL: https://issues.apache.org/jira/browse/FLINK-2032 > Project: Flink > Issue Type: Task > Components: Tests >Affects Versions: 0.9 >Reporter: Fabian Hueske >Priority: Minor > Labels: starter > > Most of Flink's integration tests that execute full Flink programs and check > their results are implemented by writing results to temporary output file and > comparing the content of the file to a provided set of expected Strings. > Flink's test utils make this quite comfortable and hide a lot of the > complexity of this approach. Nonetheless, this approach has a few drawbacks: > - increased latency by going through disk > - comparison is on String representation of objects > - depends on the file system > Since Flink's {{collect()}}Â feature was added, the temp file approach is not > the best approach anymore. Instead, tests can collect the result of a Flink > program directly as objects and compare these against a set of expected > objects. > It would be good to migrate the existing test base to use {{collect()}} > instead of temporary output files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32679424 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String path2,ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) + { + + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); + + + } + + public CsvReader getEdgeReader() + { + return this.EdgeReader; + } + + public CsvReader getVerte
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-112954221 Hey @shghatge, this is a great first try, you got the logic right and I really like the detailed javadocs ^^ I left a few inline comments, which should be easy to fix. Let me also elaborate a bit on some general guidelines: - Code formatting: we don't really have a strict Java code style, but there a few things you can improve. For your code to be readable, it is nice to leave a space after the commas separating arguments. For example `myMethod(arg1, arg2, arg3)` instead of `myMethod(arg1,arg2,arg3)`. We usually separate the closing of a parenthesis and the opening of a curly bracket with a space, i.e. `myMethod() { ... }` instead of `myMethod(){ ... }`. Also, try to avoid adding new lines if they are not needed. Regarding the types missing, this is not creating an error, but gives a warning. You can turn on warning notification settings in your IDE to avoid this. - I like it that you added separate methods `includeFields` methods` for vertices and edges. It would probably make sense to do the same for the rest of the methods. For example, you might want to skip the first line in the edges file, but not in the vertices file. Right now, you are forced to either do both or none. Alternatively, we could add parameters to the existing methods, to define the behavior for edges and vertices files separately. For example `public GraphCsvReader lineDelimiter(String VertexDelimiter, EdgeDelimiter)`. What do you think? - Finally, in order to catch issues like the one with the null `VertexReader`, you should always try to test as much functionality you have added as possible. In this case, it would be a good idea to add a test reading from edges only and some tests for the different methods you have added. Let me know if you have questions! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32676277 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String path2,ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) + { + + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); + + + } + + public CsvReader getEdgeReader() + { + return this.EdgeReader; + } + + public CsvReader getVerte
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32674908 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String path2,ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) + { + + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); + + + } + + public CsvReader getEdgeReader() + { + return this.EdgeReader; + } + + public CsvReader getVerte
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32674993 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String path2,ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) + { + + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); + + + } + + public CsvReader getEdgeReader() + { + return this.EdgeReader; + } + + public CsvReader getVerte
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32674944 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String path2,ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) + { + + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); + + + } + + public CsvReader getEdgeReader() + { + return this.EdgeReader; + } + + public CsvReader getVerte
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32675011 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String path2,ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) + { + + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); + + + } + + public CsvReader getEdgeReader() + { + return this.EdgeReader; + } + + public CsvReader getVerte
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32674923 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String path2,ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) + { + + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); + + + } + + public CsvReader getEdgeReader() + { + return this.EdgeReader; + } + + public CsvReader getVerte
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32674967 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String path2,ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) + { + + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); + + + } + + public CsvReader getEdgeReader() + { + return this.EdgeReader; + } + + public CsvReader getVerte
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32674930 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String path2,ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) + { + + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); + + + } + + public CsvReader getEdgeReader() + { + return this.EdgeReader; + } + + public CsvReader getVerte
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32673806 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String path2,ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path1, "The file path may not be null.")),new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")), context); + } + + + public GraphCsvReader (String path2, final MapFunction mapper, ExecutionEnvironment context) + { + + this(new Path(Preconditions.checkNotNull(path2, "The file path may not be null.")),mapper, context); + + + } + + public CsvReader getEdgeReader() + { + return this.EdgeReader; + } + + public CsvReader getVerte
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32673548 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunction mapper, ExecutionEnvironment context) --- End diff -- add type arguments to MapFunction --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32673330 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java --- @@ -282,6 +282,58 @@ public void flatMap(Edge edge, Collector> out) { } /** + * Creates a graph from CSV files. + * + * Vertices with value are created from a CSV file with 2 fields + * Edges with value are created from a CSV file with 3 fields + * from Tuple3. + * + * @param path1 path to a CSV file with the Vertices data. + * @param path2 path to a CSV file with the Edges data + * @param context the flink execution environment. + * @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the + *Vertex ID, Vertex Value and Edge value returns a Graph + */ + + public static GraphCsvReader fromCsvReader(String path1, String path2, ExecutionEnvironment context){ + return (new GraphCsvReader(path1,path2,context)); + } + /** Creates a graph from a CSV file for Edges., Vertices are + * induced from the edges. + * + * Edges with value are created from a CSV file with 3 fields. Vertices are created + * automatically and their values are set to NullValue. + * + * @param path a path to a CSV file with the Edges data + * @param context the flink execution environment. + * @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the + * Vertex ID, Vertex Value and Edge value returns a Graph + */ + + public static GraphCsvReader fromCsvReader(String path, ExecutionEnvironment context){ + return (new GraphCsvReader(path,context)); + } + + /** +*Creates a graph from a CSV file for Edges., Vertices are +* induced from the edges and vertex values are calculated by a mapper +* function. Edges with value are created from a CSV file with 3 fields. +* Vertices are created automatically and their values are set by applying the provided map +* function to the vertex ids. +* +* @param path a path to a CSV file with the Edges data +* @param mapper the mapper function. +* @param context the flink execution environment. +* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the +* Vertex ID, Vertex Value and Edge value returns a Graph +*/ + + public static GraphCsvReader fromCsvReader(String path, final MapFunction mapper,ExecutionEnvironment context) --- End diff -- add type arguments to MapFunction --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32673274 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunction mapper; --- End diff -- add type arguments to MapFunction --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32673221 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; + + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ + + + +public class GraphCsvReader{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + + private Path edgePath; + private Path vertexPath; --- End diff -- `edgePath` and `vertexPath` also seem to be unused --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r32673158 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.types.NullValue; +import org.apache.flink.core.fs.Path; --- End diff -- unused import --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---