Re: Graph Analytics on HBase With HGraphDB and Apache Flink Gelly

2017-07-27 Thread Vasiliki Kalavri
Thank you for sharing!

On 28 July 2017 at 05:01, Robert Yokota  wrote:

> Also Google Cloud Bigtable has such a page at https://cloud.google.com/
> bigtable/docs/integrations
>
> On Thu, Jul 27, 2017 at 6:57 PM, Robert Yokota  wrote:
>
>>
>> One thing I really appreciate about HBase is its flexibility.  It doesn't
>> enforce a schema, but also doesn't prevent you from building a schema layer
>> on top.  It is very customizable, allowing you to push arbitrary code to
>> the server in the form of filters and coprocessors.
>>
>> Not having such higher-layer features built into HBase allows it to
>> remain flexibile, but it does have a down-side.  One complaint is that for
>> a new user coming to HBase, who perhaps does want to work with things like
>> query languages, schemas, secondary indices, transactions, and so forth, it
>> can be daunting to research and understand what other projects in the HBase
>> ecosystem can help him/her, how others have used such projects, and under
>> what use cases each project might be successful or not.
>>
>> Perhaps a good start would be something like an "HBase ecosystem" page at
>> the website that would list projects like Phoenix, Tephra, and others in
>> the HBase ecosystem.  The Apache TinkerPop site has a listing of projects
>> in its ecosystem at http://tinkerpop.apache.org.   I think new users
>> coming to HBase aren't even aware of the larger ecosystem, and sometimes
>> end up selecting alternative data stores as a result.
>>
>> P.S.  I'm using HBase 1.1.2
>>
>> On Thu, Jul 27, 2017 at 5:42 PM, Ted Yu  wrote:
>>
>>> Interesting blog.
>>>
>>> From your experience, is there anything on hbase side which you see room
>>> for improvement ?
>>>
>>> Which hbase release are you using ?
>>>
>>> Cheers
>>>
>>> On Thu, Jul 27, 2017 at 3:11 PM, Robert Yokota 
>>> wrote:
>>>
 In case anyone is interested, I wrote a blog on how to analyze graphs
 stored in HBase with Apache Flink Gelly:

 https://yokota.blog/2017/07/27/graph-analytics-on-hbase-with
 -hgraphdb-and-apache-flink-gelly/

>>>
>>>
>>
>


Re: Gelly - bipartite graph runs vertex-centric

2017-06-26 Thread Vasiliki Kalavri
Hi Marc,

the BipartiteGraph type doesn't support vertex-centric iterations yet. You
can either represent your bipartite graph using the Graph type and e.g.
having an extra attribute in the vertex value to distinguish between top
and bottom vertices or define your own custom delta iteration on top of
BipartiteGraph.

Best,
-Vasia.

On 23 June 2017 at 16:56, Kaepke, Marc  wrote:

> Hi,
>
> does Gelly provides a vertex-centric iteration on a bipartite graph?
>
> A bipartite graph is using BipartiteEdges and vertex-centric supports
> regular edges only.
>
>
> Thanks!
>
> Best,
> Marc


Re: Graph iteration with triplets or access to edges

2017-04-28 Thread Vasiliki Kalavri
Hi Marc,

you can access the edge values inside the ScatterFunction using the
getEdges() method. For an example look at SingleSourceShortestPaths [1]
which sums up edge values to compute distances.

I hope that helps!
-Vasia.

[1]:
https://github.com/apache/flink/blob/master/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/SingleSourceShortestPaths.java#L115

On 28 April 2017 at 12:38, Kaepke, Marc  wrote:

> to summarize my question:
> Does Flink or Gelly offers an access to the edges of a single vertex?
> Or:
> I need a VertexTriplet and not an EdgeTriplet (graph.getTriplets())
>
> Thanks!
> Best,
> Marc
>
> > Am 27.04.2017 um 20:20 schrieb Kaepke, Marc  >:
> >
> > Hi everyone,
> >
> > in Gelly I use the Scatter-Gather Iteration to create clusters depends
> on edge-values.
> >
> > During a superstep each vertex has to sum all edge values of its
> neighbors. The scatter and gather functions both works on one vertex.
> >
> > How can I get the out and in-going edges of each vertex?
> > Or
> > Does the iterations works with triplets and if: how?
> >
> >
> > Best,
> > Marc
>
>


Re: Questions about the V-C Iteration in Gelly

2017-02-14 Thread Vasiliki Kalavri
Dear Xingcan,

no need to apologize, we are here to help :) You are always welcome to ask
questions / make suggestions.

Cheers,
-Vasia.

On 14 February 2017 at 09:35, Xingcan Cui  wrote:

> Hi Vasia,
>
> sorry that I should have read the archive before (it's already been posted
> in FLINK-1526, though with an ugly format). Now everything's clear and I
> think this thread should be closed here.
>
> Thanks. @Vasia @Greg
>
> Best,
> Xingcan
>
> On Tue, Feb 14, 2017 at 3:55 PM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Hi Xingcan,
>>
>> that's my bad, I was thinking of scatter-gather iterations in my previous
>> reply. You're right, in VertexCentricIteration a vertex is only active in
>> the next superstep if it has received at least one message in the current
>> superstep. Updating its value does not impact the activation. This is
>> intentional in the vertex-centric model.
>>
>> I agree that the current design of the iterative models is restrictive
>> and doesn't allow for the expression of complex iterative algorithms that
>> require updating edges or defining phases. We have discussed this before,
>> e.g. in [1]. The outcome of that discussion was that we should use for-loop
>> iterations for such cases, as the closed-loop iteration operators of Flink
>> might not provide the necessary flexibility. As you will see in the thread
>> though, that proposal didn't work out, as efficiently supporting for-loops
>> in Flink is not possible right now.
>>
>> -Vasia.
>>
>> [1]: http://apache-flink-mailing-list-archive.1008284.n3.
>> nabble.com/DISCUSS-Gelly-iteration-abstractions-td3949.html
>>
>> On 14 February 2017 at 08:10, Xingcan Cui  wrote:
>>
>>> Hi Greg,
>>>
>>> I also found that in VertexCentricIteration.java, the message set is
>>> taken as the workset while the vertex set is taken as the delta for
>>> solution set. By doing like that, the setNewVertex method will not actually
>>> active a vertex. In other words, if no message is generated (the workset is
>>> empty) the "pact.runtime.workset-empty-aggregator" will judge
>>> convergence of the delta iteration and then the iteration just terminates.
>>> Is this a bug?
>>>
>>> Best,
>>> Xingcan
>>>
>>>
>>> On Mon, Feb 13, 2017 at 5:24 PM, Xingcan Cui  wrote:
>>>
>>>> Hi Greg,
>>>>
>>>> Thanks for your attention.
>>>>
>>>> It takes me a little time to read the old PR on FLINK-1885. Though
>>>> the VertexCentricIteration, as well as its related classes, has been
>>>> refactored, I understand what Markus want to achieve.
>>>>
>>>> I am not sure if using a bulk iteration instead of a delta one could
>>>> eliminate the "out of memory" problem.  Except for that, I think the "auto
>>>> update" has nothing to do with the bulk mode. Considering the compatible
>>>> guarantee, here is my suggestions to improve gelly's iteration API:
>>>>
>>>> 1) Add an "autoHalt" flag to the ComputeFunction.
>>>>
>>>> 2) If the flag is set true (default), apply the current mechanism .
>>>>
>>>> 3) If the flag is set false, call out.collect() to update the vertex
>>>> value whether the setNewVertexValue() method is called or not, unless the
>>>> user explicitly call a (new added) voteToHalt() method in the
>>>> ComputeFunction.
>>>>
>>>> By adding these, users can decide when to halt a vertex themselves.
>>>> What do you think?
>>>>
>>>> As for the "update edge values during vertex iterations" problem, I
>>>> think it needs a redesign for the gelly framework (Maybe merge the vertices
>>>> and edges into a single data set? Or just change the iterations'
>>>>  implementation? I can't think it clearly now.), so that's it for now.
>>>> Besides, I don't think there will be someone who really would love to write
>>>> a graph algorithm with Flink native operators and that's why gelly is
>>>> designed, isn't it?
>>>>
>>>> Best,
>>>> Xingcan
>>>>
>>>> On Fri, Feb 10, 2017 at 10:31 PM, Greg Hogan 
>>>> wrote:
>>>>
>>>>> Hi Xingcan,
>>>>>
>>>>> FLINK-1885 looked into adding a bulk mode to Gelly's iterative models.
>>>&

Re: Questions about the V-C Iteration in Gelly

2017-02-13 Thread Vasiliki Kalavri
Hi Xingcan,

that's my bad, I was thinking of scatter-gather iterations in my previous
reply. You're right, in VertexCentricIteration a vertex is only active in
the next superstep if it has received at least one message in the current
superstep. Updating its value does not impact the activation. This is
intentional in the vertex-centric model.

I agree that the current design of the iterative models is restrictive and
doesn't allow for the expression of complex iterative algorithms that
require updating edges or defining phases. We have discussed this before,
e.g. in [1]. The outcome of that discussion was that we should use for-loop
iterations for such cases, as the closed-loop iteration operators of Flink
might not provide the necessary flexibility. As you will see in the thread
though, that proposal didn't work out, as efficiently supporting for-loops
in Flink is not possible right now.

-Vasia.

[1]:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Gelly-iteration-abstractions-td3949.html

On 14 February 2017 at 08:10, Xingcan Cui  wrote:

> Hi Greg,
>
> I also found that in VertexCentricIteration.java, the message set is taken
> as the workset while the vertex set is taken as the delta for solution set.
> By doing like that, the setNewVertex method will not actually active a
> vertex. In other words, if no message is generated (the workset is empty)
> the "pact.runtime.workset-empty-aggregator" will judge convergence of the
> delta iteration and then the iteration just terminates. Is this a bug?
>
> Best,
> Xingcan
>
>
> On Mon, Feb 13, 2017 at 5:24 PM, Xingcan Cui  wrote:
>
>> Hi Greg,
>>
>> Thanks for your attention.
>>
>> It takes me a little time to read the old PR on FLINK-1885. Though
>> the VertexCentricIteration, as well as its related classes, has been
>> refactored, I understand what Markus want to achieve.
>>
>> I am not sure if using a bulk iteration instead of a delta one could
>> eliminate the "out of memory" problem.  Except for that, I think the "auto
>> update" has nothing to do with the bulk mode. Considering the compatible
>> guarantee, here is my suggestions to improve gelly's iteration API:
>>
>> 1) Add an "autoHalt" flag to the ComputeFunction.
>>
>> 2) If the flag is set true (default), apply the current mechanism .
>>
>> 3) If the flag is set false, call out.collect() to update the vertex
>> value whether the setNewVertexValue() method is called or not, unless the
>> user explicitly call a (new added) voteToHalt() method in the
>> ComputeFunction.
>>
>> By adding these, users can decide when to halt a vertex themselves. What
>> do you think?
>>
>> As for the "update edge values during vertex iterations" problem, I think
>> it needs a redesign for the gelly framework (Maybe merge the vertices and
>> edges into a single data set? Or just change the iterations'
>>  implementation? I can't think it clearly now.), so that's it for now.
>> Besides, I don't think there will be someone who really would love to write
>> a graph algorithm with Flink native operators and that's why gelly is
>> designed, isn't it?
>>
>> Best,
>> Xingcan
>>
>> On Fri, Feb 10, 2017 at 10:31 PM, Greg Hogan  wrote:
>>
>>> Hi Xingcan,
>>>
>>> FLINK-1885 looked into adding a bulk mode to Gelly's iterative models.
>>>
>>> As an alternative you could implement your algorithm with Flink
>>> operators and a bulk iteration. Most of the Gelly library is written with
>>> native operators.
>>>
>>> Greg
>>>
>>> On Fri, Feb 10, 2017 at 5:02 AM, Xingcan Cui  wrote:
>>>
>>>> Hi Vasia,
>>>>
>>>> b) As I said, when some vertices finished their work in current phase,
>>>> they have nothing to do (no value updates, no message received, just like
>>>> slept) but to wait for other vertices that have not finished (the current
>>>> phase) yet. After that in the next phase, all the vertices should go back
>>>> to work again and if there are some vertices become inactive in last phase,
>>>> it could be hard to reactive them again by message since we even don't know
>>>> which vertices to send to. The only solution is to keep all vertices
>>>> active, whether by updating vertices values in each super step or sending
>>>> heartbeat messages to vertices themselves (which will bring a lot of extra
>>>> work to the MessageCombiner).
>>>>
>>>> c) I know it's

Re: Questions about the V-C Iteration in Gelly

2017-02-10 Thread Vasiliki Kalavri
Hi Xingcan,

On 9 February 2017 at 18:16, Xingcan Cui  wrote:

> Hi Vasia,
>
> thanks for your reply. It helped a lot and I got some new ideas.
>
> a) As you said, I did use the getPreviousIterationAggregate() method in
> preSuperstep() of the next superstep.
> However, if the (only?) global (aggregate) results can not be guaranteed
> to be consistency,  what should we
> do with the postSuperstep() method?
>

​The postSuperstep() method is analogous to the close() method in a
RichFunction, which is typically used for cleanup.​



>
> b) Though we can active vertices by update method or messages, IMO, it may
> be more proper for users
> themselves to decide when to halt a vertex's iteration. Considering a
> complex algorithm that contains different
> phases inside a vertex-centric iteration. Before moving to the next phase
> (that should be synchronized),
> there may be some vertices that already finished their work in current
> phase and they just wait for others.
> Users may choose the finished vertices to idle until the next phase, but
> rather than to halt them.
> Can we consider adding the voteToHalt() method and some internal variables
> to the Vertex/Edge class
> (or just create an "advanced" version of them) to make the halting more
> controllable?
>


​I suppose adding a voteToHalt() method is possible, but I'm not sure I see
how that would make halting more controllable. If a vertex hasn't changed
value or hasn't received a message, it has no work to do in the next
iteration, so why keep it active? If in a later superstep, a previously
inactive vertex receives a message, it will become active again. ​Is this
what you're looking for or am I missing something?



>
> c) Sorry that I didn't make it clear before. Here the initialization means
> a "global" one that executes once
> before the iteration. For example, users may want to initialize the
> vertices' values by their adjacent edges
> before the iteration starts. Maybe we can add an extra coGroupFunction to
> the configuration parameters
> and apply it before the iteration?
>


​You can initialize the graph by using any Gelly transformation methods
before starting the iteration, e.g. mapVertices, mapEdges, reduceOnEdges,
etc.
Btw, a vertex can iterate over its edges inside the ComputeFunction using
the getEdges() method. Initializing the vertex values with neighboring
edges might not be a good idea if you have vertices with high degrees.​


​Cheers,
-Vasia.​



>
> What do you think?
>
> (BTW, I started a PR on FLINK-1526(MST Lib&Example). Considering the
> complexity, the example is not
> provided.)
>
> Really appreciate for all your help.
>
> Best,
> Xingcan
>
> On Thu, Feb 9, 2017 at 5:36 PM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Hi Xingcan,
>>
>> On 7 February 2017 at 10:10, Xingcan Cui  wrote:
>>
>>> Hi all,
>>>
>>> I got some question about the vertex-centric iteration in Gelly.
>>>
>>> a)  It seems the postSuperstep method is called before the superstep
>>> barrier (I got different aggregate values of the same superstep in this
>>> method). Is this a bug? Or the design is just like that?
>>>
>>
>> ​The postSuperstep() method is called inside the close() method of a
>> RichCoGroupFunction that wraps the ComputeFunction. The close() method
>> It is called after the last call to the coGroup() after each iteration
>> superstep.
>> The aggregate values are not guaranteed to be consistent during the same
>> superstep when they are computed. To retrieve an aggregate value for
>> superstep i, you should use the getPreviousIterationAggregate() method
>> in superstep i+1.
>>
>>
>>>
>>> b) There is not setHalt method for vertices. When no message received, a
>>> vertex just quit the next iteration. Should I manually send messages (like
>>> heartbeat) to keep the vertices active?
>>>
>>
>> ​That's because vertex halting is implicitly controlled by the underlying
>> delta iterations of Flink. ​A vertex will remain active as long as it
>> receives a message or it updates its value, otherwise it will become
>> inactive. The documentation on Gelly iterations [1] and DataSet iterations
>> [2] might be helpful.
>>
>>
>>
>>>
>>> c) I think we may need an initialization method in the ComputeFunction.
>>>
>>
>>
>> ​There exists a preSuperstep() method for initialization. This one will
>> be executed once per superstep before the compute function is invoked for
>> every vertex. Would this work for you?
>>
>>
>>
>>>
>>> Any opinions? Thanks.
>>>
>>> Best,
>>> Xingcan
>>>
>>>
>>>
>> ​I hope this helps,
>> -Vasia.​
>>
>>
>> ​[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/libs/gelly/iterative_graph_processing.html#vertex-centric-iterations
>> [2]: https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/batch/iterations.html​
>>
>>
>


Re: Questions about the V-C Iteration in Gelly

2017-02-09 Thread Vasiliki Kalavri
Hi Xingcan,

On 7 February 2017 at 10:10, Xingcan Cui  wrote:

> Hi all,
>
> I got some question about the vertex-centric iteration in Gelly.
>
> a)  It seems the postSuperstep method is called before the superstep
> barrier (I got different aggregate values of the same superstep in this
> method). Is this a bug? Or the design is just like that?
>

​The postSuperstep() method is called inside the close() method of a
RichCoGroupFunction that wraps the ComputeFunction. The close() method It
is called after the last call to the coGroup() after each iteration
superstep.
The aggregate values are not guaranteed to be consistent during the same
superstep when they are computed. To retrieve an aggregate value for
superstep i, you should use the getPreviousIterationAggregate() method in
superstep i+1.


>
> b) There is not setHalt method for vertices. When no message received, a
> vertex just quit the next iteration. Should I manually send messages (like
> heartbeat) to keep the vertices active?
>

​That's because vertex halting is implicitly controlled by the underlying
delta iterations of Flink. ​A vertex will remain active as long as it
receives a message or it updates its value, otherwise it will become
inactive. The documentation on Gelly iterations [1] and DataSet iterations
[2] might be helpful.



>
> c) I think we may need an initialization method in the ComputeFunction.
>


​There exists a preSuperstep() method for initialization. This one will be
executed once per superstep before the compute function is invoked for
every vertex. Would this work for you?



>
> Any opinions? Thanks.
>
> Best,
> Xingcan
>
>
>
​I hope this helps,
-Vasia.​


​[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/gelly/iterative_graph_processing.html#vertex-centric-iterations
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/iterations.html
​


Re: Apache Flink 1.1.4 - Gelly - LocalClusteringCoefficient - Returning values above 1?

2017-01-23 Thread Vasiliki Kalavri
.0)
> (5113,26284,1.0)
> (8056,5113,1.0)
> (5113,8056,1.0)
> (10371,5113,1.0)
> (5113,10371,1.0)
> (16785,5113,1.0)
> (5113,16785,1.0)
> (19801,5113,1.0)
> (5113,19801,1.0)
> (6715,5113,1.0)
> (5113,6715,1.0)
> (31724,5113,1.0)
> (5113,31724,1.0)
> (32443,5113,1.0)
> (5113,32443,1.0)
> (10370,5113,1.0)
> (5113,10370,1.0)
>
> Any insight into what I may be doing wrong would be greatly appreciated.
>
> Thanks for your time,
>
> Kind regards,
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com 
> Skype: miguel.e.coimbra
>
> On 20 January 2017 at 19:31, Greg Hogan  wrote:
>
>> Hi Miguel,
>>
>> The '--output print' option describes the values and also displays the
>> local clustering coefficient value.
>>
>> You're running the undirected algorithm on a directed graph. In 1.2 there
>> is an option '--simplify true' that will add reverse edges and remove
>> duplicate edges and self-loops. Alternatively, it looks like you could
>> simply add reverse edges to your input file (with an optional ' | sort |
>> uniq' following):
>>
>> $ cat edges.txt | awk ' { print $1, $2; print $2, $1 } '
>>
>> The drivers are being reworked for 1.3 to better reuse code and options
>> which will better support additional drivers and algorithms and make
>> documentation simpler.
>>
>> Greg
>>
>> On Fri, Jan 20, 2017 at 2:06 PM, Vasiliki Kalavri <
>> vasilikikala...@gmail.com> wrote:
>>
>>> Hi Miguel,
>>>
>>> the LocalClusteringCoefficient algorithm returns a DataSet of type
>>> Result, which basically wraps a vertex id, its degree, and the number
>>> of triangles containing this vertex. The number 11 you see is indeed the
>>> degree of vertex 5113. The Result type contains the method
>>> getLocalClusteringCoefficientScore() which allows you to retrieve the
>>> clustering coefficient score for a vertex. The method simply divides the
>>> numbers of triangles by the number of potential edges between neighbors.
>>>
>>> I'm sorry that you this is not clear in the docs. We should definitely
>>> improve them to explain what is the output and how to retrieve the actual
>>> clustering coefficient values. I have opened a JIRA for this [1].
>>>
>>> Cheers,
>>> -Vasia.
>>>
>>> [1]: https://issues.apache.org/jira/browse/FLINK-5597
>>>
>>> On 20 January 2017 at 19:31, Miguel Coimbra 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> In the documentation of the LocalClusteringCoefficient algorithm, it
>>>> is said:
>>>>
>>>>
>>>> *The local clustering coefficient measures the connectedness of each
>>>> vertex’s neighborhood.Scores range from 0.0 (no edges between neighbors) to
>>>> 1.0 (neighborhood is a clique).*
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>> apis/batch/libs/gelly.html#local-clustering-coefficient
>>>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/gelly/library_methods.html#local-clustering-coefficient>
>>>>
>>>> However, upon running the algorithm (undirected version), I obtained
>>>> values above 1.
>>>>
>>>> The result I got was this. As you can see, vertex 5113 has a score of
>>>> 11:
>>>> (the input edges for the graph are shown further below - around *35
>>>> edges*):
>>>>
>>>> (4907,(1,0))
>>>> *(5113,(11,0))*
>>>> (6008,(0,0))
>>>> (6064,(1,0))
>>>> (6065,(1,0))
>>>> (6107,(0,0))
>>>> (6192,(0,0))
>>>> (6252,(1,0))
>>>> (6279,(1,0))
>>>> (6465,(1,0))
>>>> (6545,(0,0))
>>>> (6707,(1,0))
>>>> (6715,(1,0))
>>>> (6774,(0,0))
>>>> (7088,(0,0))
>>>> (7089,(1,0))
>>>> (7171,(0,0))
>>>> (7172,(1,0))
>>>> (7763,(0,0))
>>>> (7976,(1,0))
>>>> (8056,(1,0))
>>>> (9748,(1,0))
>>>> (10191,(1,0))
>>>> (10370,(1,0))
>>>> (10371,(1,0))
>>>> (14310,(1,0))
>>>> (16785,(1,0))
>>>> (19801,(1,0))
>>>> (26284,(1,0))
>>>> (26562,(0,0))
>>>> (31724,(1,0))
>>>> (32443,(1,0))
>>>> (32938,(0,0))
>>>> (33855,(1,0))
>>>> (37929,(0,0))
>>>>
>>>> This was from a small isolated test with these edges:
>>>>
>>>> 51136008
>>>> 51136774
>>>> 511332938
>>>> 51136545
>>>> 51137088
>>>> 511337929
>>>> 511326562
>>>> 51136107
>>>> 51137171
>>>> 51136192
>>>> 51137763
>>>> 97485113
>>>> 101915113
>>>> 60645113
>>>> 60655113
>>>> 62795113
>>>> 49075113
>>>> 64655113
>>>> 67075113
>>>> 70895113
>>>> 71725113
>>>> 143105113
>>>> 62525113
>>>> 338555113
>>>> 79765113
>>>> 262845113 <262%20845%20113>
>>>> 80565113
>>>> 103715113
>>>> 167855113
>>>> 198015113
>>>> 67155113
>>>> 317245113
>>>> 324435113
>>>> 103705113
>>>>
>>>> I am not sure what I may be doing wrong, but is there perhaps some form
>>>> of normalization lacking in my execution of:
>>>>
>>>> org.apache.flink.graph.library.clustering.undirected.LocalCl
>>>> usteringCoefficient.Result;
>>>> org.apache.flink.graph.library.clustering.undirected.LocalCl
>>>> usteringCoefficient;
>>>>
>>>> Am I supposed to divide all scores by the greatest score obtained by
>>>> the algorithm?
>>>>
>>>> Thank you very much!
>>>>
>>>> Miguel E. Coimbra
>>>> Email: miguel.e.coim...@gmail.com 
>>>> Skype: miguel.e.coimbra
>>>>
>>>
>>>
>>
>


Re: Apache Flink 1.1.4 - Gelly - LocalClusteringCoefficient - Returning values above 1?

2017-01-20 Thread Vasiliki Kalavri
Hi Miguel,

the LocalClusteringCoefficient algorithm returns a DataSet of type Result,
which basically wraps a vertex id, its degree, and the number of triangles
containing this vertex. The number 11 you see is indeed the degree of
vertex 5113. The Result type contains the method
getLocalClusteringCoefficientScore() which allows you to retrieve the
clustering coefficient score for a vertex. The method simply divides the
numbers of triangles by the number of potential edges between neighbors.

I'm sorry that you this is not clear in the docs. We should definitely
improve them to explain what is the output and how to retrieve the actual
clustering coefficient values. I have opened a JIRA for this [1].

Cheers,
-Vasia.

[1]: https://issues.apache.org/jira/browse/FLINK-5597

On 20 January 2017 at 19:31, Miguel Coimbra 
wrote:

> Hello,
>
> In the documentation of the LocalClusteringCoefficient algorithm, it is
> said:
>
>
> *The local clustering coefficient measures the connectedness of each
> vertex’s neighborhood.Scores range from 0.0 (no edges between neighbors) to
> 1.0 (neighborhood is a clique).*
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
> apis/batch/libs/gelly.html#local-clustering-coefficient
> 
>
> However, upon running the algorithm (undirected version), I obtained
> values above 1.
>
> The result I got was this. As you can see, vertex 5113 has a score of 11:
> (the input edges for the graph are shown further below - around *35 edges*
> ):
>
> (4907,(1,0))
> *(5113,(11,0))*
> (6008,(0,0))
> (6064,(1,0))
> (6065,(1,0))
> (6107,(0,0))
> (6192,(0,0))
> (6252,(1,0))
> (6279,(1,0))
> (6465,(1,0))
> (6545,(0,0))
> (6707,(1,0))
> (6715,(1,0))
> (6774,(0,0))
> (7088,(0,0))
> (7089,(1,0))
> (7171,(0,0))
> (7172,(1,0))
> (7763,(0,0))
> (7976,(1,0))
> (8056,(1,0))
> (9748,(1,0))
> (10191,(1,0))
> (10370,(1,0))
> (10371,(1,0))
> (14310,(1,0))
> (16785,(1,0))
> (19801,(1,0))
> (26284,(1,0))
> (26562,(0,0))
> (31724,(1,0))
> (32443,(1,0))
> (32938,(0,0))
> (33855,(1,0))
> (37929,(0,0))
>
> This was from a small isolated test with these edges:
>
> 51136008
> 51136774
> 511332938
> 51136545
> 51137088
> 511337929
> 511326562
> 51136107
> 51137171
> 51136192
> 51137763
> 97485113
> 101915113
> 60645113
> 60655113
> 62795113
> 49075113
> 64655113
> 67075113
> 70895113
> 71725113
> 143105113
> 62525113
> 338555113
> 79765113
> 262845113
> 80565113
> 103715113
> 167855113
> 198015113
> 67155113
> 317245113
> 324435113
> 103705113
>
> I am not sure what I may be doing wrong, but is there perhaps some form of
> normalization lacking in my execution of:
>
> org.apache.flink.graph.library.clustering.undirected.
> LocalClusteringCoefficient.Result;
> org.apache.flink.graph.library.clustering.undirected.
> LocalClusteringCoefficient;
>
> Am I supposed to divide all scores by the greatest score obtained by the
> algorithm?
>
> Thank you very much!
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com 
> Skype: miguel.e.coimbra
>


Re: Apache Flink 1.1.4 - Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-01-18 Thread Vasiliki Kalavri
Great! Let us know if you need help.

-Vasia.



On 17 January 2017 at 10:30, Miguel Coimbra 
wrote:

> Hello Vasia,
>
> I am going to look into this.
> Hopefully I will contribute to the implementation and documentation.
>
> Regards,
>
> -- Forwarded message ------
> From: Vasiliki Kalavri 
> To: user@flink.apache.org
> Cc:
> Date: Sun, 15 Jan 2017 18:01:41 +0100
> Subject: Re: Apache Flink 1.1.4 - Java 8 - CommunityDetection.java:158 -
> java.lang.NullPointerException
> Hi Miguel,
>
> this is a bug, thanks a lot for reporting! I think the problem is that the
> implementation assumes that labelsWithHighestScores contains the vertex
> itself as initial label.
>
> Could you please open a JIRA ticket for this and attach your code and data
> as an example to reproduce? We should also improve the documentation for
> this library method. I see that you are initializing vertex values and you
> have called getUndirected(), but the library method already does both of
> these operations internally.
>
> Cheers,
> -Vasia.
>
> On 13 January 2017 at 17:12, Miguel Coimbra 
> wrote:
> Hello,
>
> If I missed the answer to this or some essential step of the
> documentation, please do tell.
> I am having the following problem while trying out the
> org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly
> API (Java).
>
> Specs: JDK 1.8.0_102 x64
> Apache Flink: 1.1.4
>
> Suppose I have a very small (I tried with an example with 38 vertices as
> well) dataset stored in a tab-separated file 3-vertex.tsv:
>
> #id1 id2 score
> 010
> 020
> 030
>
> This is just a central vertex with 3 neighbors (disconnected between
> themselves).
> I am loading the dataset and executing the algorithm with the following
> code:
>
>
> ---
> // Load the data from the .tsv file.
> final DataSet> edgeTuples =
> env.readCsvFile(inputPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#")  // comments start with "%"
> .types(Long.class, Long.class, Double.class);
>
> // Generate a graph and add reverse edges (undirected).
> final Graph graph = Graph.fromTupleDataSet(edgeTuples,
> new MapFunction() {
> private static final long serialVersionUID =
> 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env).getUndirected();
>
> // CommunityDetection parameters.
> final double hopAttenuationDelta = 0.5d;
> final int iterationCount = 10;
>
> // Prepare and trigger the execution.
> DataSet> vs = graph.run(new
> org.apache.flink.graph.library.CommunityDetection(iterationCount,
> hopAttenuationDelta)).getVertices();
>
> vs.print();
> ​---​
>
> ​Running this code throws the following exception​ (check the bold line):
>
> ​org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
> leMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
> leMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
> leMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
> dTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
> uture.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
> exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.j
> ava:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExec
> All(ForkJoinPool.java:1253)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For
> kJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
> l.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
> orkerThread.java:107)
>
> *Caused by: java.lang.NullPointerExceptionat
> org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158)*
> at org.apache.flink.graph.spargel.ScatterGatherIteration$Gather
> UdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> at org.apache.flink.runtime.op

Re: Apache Flink 1.1.4 - Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-01-16 Thread Vasiliki Kalavri
Hi Miguel,

thank you for opening the issue!
Changes/improvements to the documentation are also typically handled with
JIRAs and pull requests [1]. Would you like to give it a try and improve
the community detection docs?

Cheers,
-Vasia.

[1]: https://flink.apache.org/contribute-documentation.html

On 16 January 2017 at 12:58, Miguel Coimbra 
wrote:

> Hello,
>
> I created the JIRA issue at:
> https://issues.apache.org/jira/browse/FLINK-5506
>
> Is it possible to submit suggestions to the documentation?
> If so, where can I do so?
>
> I actually did this based on the example at this page (possible Flink
> versions aside):
>
> https://flink.apache.org/news/2015/08/24/introducing-flink-
> gelly.html#use-case-music-profiles
>
> ​From the documentation I assumed that CommunityDetection has the same
> internal semantics as LabelPropagation (minus the algorithm difference
> itself)​
> ​​
> ​.
> It would be relevant to mention that it is not necessary to generate IDs
> (as in the music example) and that an undirected representation of the
> graph is generated before the algorithm being executed.
>
> Kind regards,​
>
>
> Miguel E. Coimbra
> Email: miguel.e.coim...@gmail.com 
> Skype: miguel.e.coimbra
>


Re: Apache Flink 1.1.4 - Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-01-15 Thread Vasiliki Kalavri
Hi Miguel,

this is a bug, thanks a lot for reporting! I think the problem is that the
implementation assumes that labelsWithHighestScores contains the vertex
itself as initial label.

Could you please open a JIRA ticket for this and attach your code and data
as an example to reproduce? We should also improve the documentation for
this library method. I see that you are initializing vertex values and you
have called getUndirected(), but the library method already does both of
these operations internally.

Cheers,
-Vasia.

On 13 January 2017 at 17:12, Miguel Coimbra 
wrote:

> Hello,
>
> If I missed the answer to this or some essential step of the
> documentation, please do tell.
> I am having the following problem while trying out the
> org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly
> API (Java).
>
> Specs: JDK 1.8.0_102 x64
> Apache Flink: 1.1.4
>
> Suppose I have a very small (I tried with an example with 38 vertices as
> well) dataset stored in a tab-separated file 3-vertex.tsv:
>
> #id1 id2 score
> 010
> 020
> 030
>
> This is just a central vertex with 3 neighbors (disconnected between
> themselves).
> I am loading the dataset and executing the algorithm with the following
> code:
>
>
> ---
> // Load the data from the .tsv file.
> final DataSet> edgeTuples =
> env.readCsvFile(inputPath)
> .fieldDelimiter("\t") // node IDs are separated by spaces
> .ignoreComments("#")  // comments start with "%"
> .types(Long.class, Long.class, Double.class);
>
> // Generate a graph and add reverse edges (undirected).
> final Graph graph = Graph.fromTupleDataSet(edgeTuples,
> new MapFunction() {
> private static final long serialVersionUID =
> 8713516577419451509L;
> public Long map(Long value) {
> return value;
> }
> },
> env).getUndirected();
>
> // CommunityDetection parameters.
> final double hopAttenuationDelta = 0.5d;
> final int iterationCount = 10;
>
> // Prepare and trigger the execution.
> DataSet> vs = graph.run(new org.apache.flink.graph.
> library.CommunityDetection(iterationCount, hopAttenuationDelta)).
> getVertices();
>
> vs.print();
> ​---​
>
> ​Running this code throws the following exception​ (check the bold line):
>
> ​org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> mcV$sp(JobManager.scala:805)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> pollAndExecAll(ForkJoinPool.java:1253)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
> *Caused by: java.lang.NullPointerExceptionat
> org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158)*
> at org.apache.flink.graph.spargel.ScatterGatherIteration$
> GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr
> iver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:486)
> at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> AbstractIterativeTask.java:146)
> at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
> at java.lang.Thread.run(Thread.java:745)​
>
>
> ​After a further look, I set a breakpoint (Eclipse IDE debugging) at the
> line in bold:
>
> org.apache.flink.graph.library.CommunityDetection.java (source code
> accessed automatically by Maven)
> // find the highest score of maxScoreLabel
> *double highestScore = labelsWithHighestScore.get(maxScoreLabel);​*
>
> ​- maxScoreLabel has the value 3.​
>
> 

Re: Executing graph algorithms on Gelly that are larger then memmory

2016-11-30 Thread Vasiliki Kalavri
Hi,

can you give us some more details about the algorithm you are testing and
your configuration?

Flink DataSet operators like join, coGroup, reduce, etc. spill to disk if
there is not enough memory. If you are using a delta iteration operator
though, the state that is kept across iterations (solution set) must fit in
memory.

-Vasia.

On 28 November 2016 at 21:44, otherwise777  wrote:

> Small addition, i'm currently running the programs via my IDE intelij
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Executing-graph-
> algorithms-on-Gelly-that-are-larger-then-memmory-tp10358p10359.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Type of TypeVariable 'K' in 'class <> could not be determined

2016-11-18 Thread Vasiliki Kalavri
Thanks Timo for the explanation and Wouter for reporting this. I've located
two more instances of this in the Graph class and created FLINK5097. I'll
ping you after I open the PR if that's OK.

-Vasia.

On 18 November 2016 at 12:22, Timo Walther  wrote:

> Yes. I don't know if it solve this problem but in general if the input
> type is known it should be passed for input type inference.
>
> Am 18/11/16 um 11:28 schrieb Vasiliki Kalavri:
>
> Hi Timo,
>
> thanks for looking into this! Are you referring to the 4th argument in [1]?
>
> Thanks,
> -Vasia.
>
> [1]: https://github.com/apache/flink/blob/master/flink-
> libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java#L506
>
> On 18 November 2016 at 10:25, Timo Walther  wrote:
>
>> I think I identified the problem. Input type inference can not be used
>> because of missing information.
>>
>> @Vasia: Why is the TypeExtractor in Graph. mapVertices(mapper) called
>> without information about the input type? Isn't the input of the
>> MapFunction known at this point? ( vertices.getType())
>>
>> Am 17/11/16 um 20:24 schrieb otherwise777:
>>
>> The one that's currently in my github will give you the error,
>>>
>>> In my other file i made a really ugly workaround by adding the element
>>> in an
>>> ArrayList as a single item.
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-flink-user-maili
>>> ng-list-archive.2336050.n4.nabble.com/Type-of-TypeVariable-K
>>> -in-class-could-not-be-determined-tp10173p10184.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>
>
> --
> Freundliche Grüße / Kind Regards
>
> Timo Walther
>
> Follow me: @twalthrhttps://www.linkedin.com/in/twalthr
>
>


Re: Type of TypeVariable 'K' in 'class <> could not be determined

2016-11-18 Thread Vasiliki Kalavri
Hi Timo,

thanks for looking into this! Are you referring to the 4th argument in [1]?

Thanks,
-Vasia.

[1]: https://github.com/apache/flink/blob/master/
flink-libraries/flink-gelly/src/main/java/org/apache/
flink/graph/Graph.java#L506

On 18 November 2016 at 10:25, Timo Walther  wrote:

> I think I identified the problem. Input type inference can not be used
> because of missing information.
>
> @Vasia: Why is the TypeExtractor in Graph. mapVertices(mapper) called
> without information about the input type? Isn't the input of the
> MapFunction known at this point? ( vertices.getType())
>
> Am 17/11/16 um 20:24 schrieb otherwise777:
>
> The one that's currently in my github will give you the error,
>>
>> In my other file i made a really ugly workaround by adding the element in
>> an
>> ArrayList as a single item.
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Type-of-TypeVariable-
>> K-in-class-could-not-be-determined-tp10173p10184.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>


Re: Type of TypeVariable 'K' in 'class <> could not be determined

2016-11-17 Thread Vasiliki Kalavri
Hi Wouter,

with InitVerticesMapper() are you trying to map the vertex value to a
Tuple2 or to a Double?
Your mapper is turning the vertex values into a Tuple2<> but your
scatter-gather UDFs are defining Double vertex values.

-Vasia.

On 17 November 2016 at 14:03, otherwise777  wrote:

> Hello timo,
>
> the whole project is on github:
> https://github.com/otherwise777/Temporal_Graph_library
> 
> The Tgraphalgorithm is here:
> https://github.com/otherwise777/Temporal_Graph_
> library/blob/master/src/main/java/Tgraphs/TGraphAlgorithm.java
>  library/blob/master/src/main/java/Tgraphs/TGraphAlgorithm.java>
>
> I just updated Flink and Gelly to 1.1.3 with Maven but the problem still
> occurs
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Type-of-
> TypeVariable-K-in-class-could-not-be-determined-tp10173p10175.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: 33 segments problem with configuration set

2016-11-16 Thread Vasiliki Kalavri
Dear Wouter,

first of all, as I noted in another thread already, betweenness centrality
is an extremely demanding algorithm and a distributed data engine such as
Flink is probably not the best system to implement it into. On top of that,
the message-passing model for graph computations would generate an enormous
amount of messages which translates to high memory requirements.

Now, looking at your code, there is a nested loop, which Flink cannot
handle. That is, you have a for-loop and inside that you're running a Flink
iteration. Neither for-loops nor nested loops are currently supported in
Flink, thus, I believe you will have to re-think the logic of your
algorithm.

Cheers,
-Vasia.

On 16 November 2016 at 15:10, otherwise777  wrote:

> Some additional information i just realized, it crashes on this line of
> code:
> collectionDataSet.print();
>
> I tried placing it inside of the loop, it crashes at the 7th iteration now
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/33-segments-
> problem-with-configuration-set-tp10144p10149.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Too few memory segments provided. Hash Table needs at least 33 memory segments.

2016-11-15 Thread Vasiliki Kalavri
Hi Miguel,

I'm sorry for the late reply; this e-mail got stuck in my spam folder. I'm
glad that you've found a solution :)

I've never used flink with docker, so I'm probably not the best person to
advise you on this. However, if I understand correctly, you're changing the
configuration before submitting the job but while the flink cluster is
already running. I don't know if docker is supposed to do something
differently, but after a flink cluster has been started, nodes won't reload
any changes you make to the flink-conf.yaml. You'll either have to make
your changes before starting the cluster or re-start.

Cheers,
-Vasia.

On 14 November 2016 at 18:33, Miguel Coimbra 
wrote:

> Hello,
>
> I believe I have figured this out.
>
> First, I tried Aandrey Melentyev's suggestion of executing with Apache
> Flink 1.1.3, both with default conf/flink-conf.yaml parameters as well as
> with some changes to provide additional memory. However, the same error
> happened.
>
> Note: I changed my project's pom.xml and generated the .jar again using
> Maven.
> I also copied the new .jar to both Docker instances.
>
> The test machine has 256 GB RAM and it is a scenario of two Docker
> containers.
> I send attached the relevant parts of the logs of the JobManager and of
> the TaskManager.
> Regarding memory in the TaskManager log, I was looking at a couple of
> executions and noticed something strange:
>
> 2016-11-14 15:48:45,256 INFO  org.apache.flink.runtime.io.ne
> twork.buffer.NetworkBufferPool  - Allocated 64 MB for network buffer pool
> (number of memory segments: 2048, bytes per segment: 32768).
> 2016-11-14 15:48:45,413 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  - Limiting managed memory to 0.7 of the
> currently free heap space (310 MB), memory will be allocated lazily.
>
> After that, I looked at the start of the TaskManager log and found this:
>
> 2016-11-14 15:48:38,843 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  Starting TaskManager (Version: 1.1.3,
> Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC)
> 2016-11-14 15:48:38,843 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  Current user: flink
> 2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  JVM: OpenJDK 64-Bit Server VM - Oracle
> Corporation - 1.8/25.92-b14
> 2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  Maximum heap size: 512 MiBytes
> 2016-11-14 15:48:38,844 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  JAVA_HOME:
> /usr/lib/jvm/java-1.8-openjdk/jre
> 2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  Hadoop version: 2.7.2
> 2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  -  JVM Options:
> 2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  - -XX:+UseG1GC
>
>
> *2016-11-14 15:48:38,850 INFO
> org.apache.flink.runtime.taskmanager.TaskManager  -
> -Xms512M2016-11-14 15:48:38,850 INFO
> org.apache.flink.runtime.taskmanager.TaskManager  -
> -Xmx512M*2016-11-14 15:48:38,850 INFO  org.apache.flink.runtime.taskm
> anager.TaskManager  - -XX:MaxDirectMemorySize=8388607T
>
> It seems it is running with only 512 MB, which is the default.
> This in spite of me having edited the flink-conf.yaml file before
> invoking the program for the cluster.
> I looked at the log of the JobManager and the same thing happened: it was
> using the default 256 MB instead of my 1024MB.
>
> - To recap, I built the Docker Flink image with (I send the Dockerfile
> attached):
>
> cd docker-flink-image-builder/
> ls
> Dockerfile  Dockerfile~  README.md  README.md~
> bluemix-docker-compose.sh*  build.sh*  docker-compose-bluemix.yml
> ./build.sh
>
> The only file I changed from those is the Dockerfile.
> This set of files was obtained from the Flink repository.
> I used docker-compose up to start the standalone cluster:
>
> screen
> cd docker-flink-image-builder/
> ls
> Dockerfile  Dockerfile~  README.md  README.md~
> bluemix-docker-compose.sh*  build.sh*  docker-compose-bluemix.yml
> docker-compose.yml  docker-entrypoint.sh*
> docker-compose up
>
> Then I accessed each Docker instance:
>
> docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}})
> /bin/sh
> docker exec -it $(docker ps --filter 
> name=dockerflinkimagebuilder_taskmanager_1
> --format={{.ID}}) /bin/sh
>
> While inside each of those, I started a bash shell and changed the config
> file like so:
>
> bash
> cd /home/myuser/docker-image-build-context/flink-1.1.3/conf
> vi flink-conf.yaml
>
> I have edited (on both the JobManager and the TaskManager) the following
> settings:
>
> # The heap size for the JobManager JVM
> jobmanager.heap.mb: 1024
>
> # The heap size for the TaskManager JVM
> taskmanager.heap.mb: 4096
>
> # The number of buf

Re: Retrieving a single element from a DataSet

2016-11-05 Thread Vasiliki Kalavri
Hi all,

@Wouter: I'm not sure I completely understand what you want to do, but
would broadcast variables [1] help?

@all: All-pairs-shortest-paths and betweenness centrality are very
challenging algorithms to implement efficiently in a distributed way. APSP
requires each vertex to store distances (or paths) for every other vertex
in the graph. AFAIK there's no scalable distributed algorithm to compute
these metrics. The solutions I'm aware of are (1) approximations and
sketches (e.g. spanners), (2) replicating the whole graph to several nodes
and compute paths in parallel, and (3) shared-memory implementations that
exploit multi-core parallelism. How are you planning to implement these in
Gelly?

Cheers,
-Vasia.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#broadcast-variables

On 4 November 2016 at 12:57, Greg Hogan  wrote:

> The tickets are in Flink's Jira:
>   https://issues.apache.org/jira/browse/FLINK-4965
>   https://issues.apache.org/jira/browse/FLINK-4966
>
> Are you looking to process temporal graphs with the DataStream API?
>
> On Fri, Nov 4, 2016 at 5:52 AM, otherwise777 
> wrote:
>
>> Cool, thnx for that,
>>
>> I tried searching for it in teh github but couldn't find it, do you have
>> the
>> url by any chance?
>> I'm going to try to implement such an algorithm for temporal graphs
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Retrieving-a-single
>> -element-from-a-DataSet-tp9731p9894.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: Flink error: Too few memory segments provided

2016-10-21 Thread Vasiliki Kalavri
Hi,

On 21 October 2016 at 11:17, otherwise777  wrote:

> I tried increasing the taskmanager.network.numberOfBuffers to 4k and
> later to
> 8k, i'm not sure if my configuration file is even read, it's stored inside
> my IDE as follows:  http://prntscr.com/cx0vrx 
> i build the flink program from the IDE and run it. I created several at
> different places to see if that helped but nothing changed on the error.
>

​that's correct, if you're running your application through your IDE, the
config file is not read.
For passing configuration options to the local environment​, please refer
to [1]. Alternatively, you can start Flink from the command line and submit
your job as a jar using the bin/flink command or using the web interface.
In that case, the configuration options that you set in flink-config.yaml
will be taken into account. Please refer to [2] for more details.


I hope this helps!
-Vasia.


[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/local_execution.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/local_setup.html



>
> Afaik i'm using Flink 1.1.2 and Gelly 1.2-snapshot, here's my pom.xml:
> http://paste.thezomg.com/19868/41341147/
> 
> I see that the document i linked to points to an older config file, this is
> probably because it's the first hit on google, thanks for pointing it out
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-error-Too-
> few-memory-segments-provided-tp9657p9667.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Flink error: Too few memory segments provided

2016-10-20 Thread Vasiliki Kalavri
Also pay attention to the Flink version you are using. The configuration
link you have provided points to an old version (0.8). Gelly wasn't part of
Flink then :)
You probably need to look in [1].

Cheers,
-Vasia.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html

On 20 October 2016 at 17:53, Greg Hogan  wrote:

> By default Flink only allocates 2048 network buffers (64 MiB at 32
> KiB/buffer). Have you increased the value for 
> taskmanager.network.numberOfBuffers
> in flink-conf.yaml?
>
> On Thu, Oct 20, 2016 at 11:24 AM, otherwise777 
> wrote:
>
>> I got this error in Gelly, which is a result of flink (i believe)
>>
>> Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException: Job execution
>> failed.
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
>> dTree1$1(Future.scala:24)
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
>> uture.scala:24)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
>> exec(AbstractDispatcher.scala:401)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
>> java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>> ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>> l.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>> orkerThread.java:107)
>> Caused by: java.lang.IllegalArgumentException: Too few memory segments
>> provided. Hash Table needs at least 33 memory segments.
>> at
>> org.apache.flink.runtime.operators.hash.CompactingHashTable.
>> (CompactingHashTable.java:206)
>> at
>> org.apache.flink.runtime.operators.hash.CompactingHashTable.
>> (CompactingHashTable.java:191)
>> at
>> org.apache.flink.runtime.iterative.task.IterationHeadTask.in
>> itCompactingHashTable(IterationHeadTask.java:175)
>> at
>> org.apache.flink.runtime.iterative.task.IterationHeadTask.
>> run(IterationHeadTask.java:272)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:351)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I found a related topic:
>> http://mail-archives.apache.org/mod_mbox/flink-dev/201503.mb
>> ox/%3CCAK5ODX4KJ9TB4yJ=BcNwsozbOoXwdB7HM9qvWoa1P9HK-Gb-Dg@
>> mail.gmail.com%3E
>> But i don't think the problem is the same,
>>
>> The code is as follows:
>>
>> ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> DataSource twitterEdges =
>> env.readCsvFile("./datasets/out.munmun_twitter_social").fieldDelimiter("
>> ").ignoreComments("%").types(Long.class, Long.class);
>> Graph graph = Graph.fromTuple2DataSet(twitterEdges, new
>> testinggraph.InitVertices(), env);
>> DataSet verticesWithCommunity = (DataSet)graph.run(new
>> LabelPropagation(1));
>> System.out.println(verticesWithCommunity.count());
>>
>> And it has only a couple of edges.
>>
>> I tried adding a config file in the project to add a couple of settings
>> found here:
>> https://ci.apache.org/projects/flink/flink-docs-release-0.8/config.html
>> but
>> that didn't work either
>>
>> I have no idea how to fix this atm, it's not just the LabelPropagation
>> that
>> goes wrong, all gelly methods give this exact error if it's using an
>> iteration.
>>
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Flink-error-Too-few
>> -memory-segments-provided-tp9657.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: [ANNOUNCE] Flink 1.1.0 Released

2016-08-08 Thread Vasiliki Kalavri
yoo-hoo finally announced 🎉
Thanks for managing the release Ufuk!

On 8 August 2016 at 18:36, Ufuk Celebi  wrote:

> The Flink PMC is pleased to announce the availability of Flink 1.1.0.
>
> On behalf of the PMC, I would like to thank everybody who contributed
> to the release.
>
> The release announcement:
> http://flink.apache.org/news/2016/08/08/release-1.1.0.html
>
> Release binaries:
> http://apache.openmirror.de/flink/flink-1.1.0/
>
> Please update your Maven dependencies to the new 1.1.0 version and
> update your binaries.
>
> – Ufuk
>


Re: sampling function

2016-07-11 Thread Vasiliki Kalavri
Hi Do,

Paris and Martha worked on sampling techniques for data streams on Flink
last year. If you want to implement your own samplers, you might find
Martha's master thesis helpful [1].

-Vasia.

[1]: http://kth.diva-portal.org/smash/get/diva2:910695/FULLTEXT01.pdf

On 11 July 2016 at 11:31, Kostas Kloudas 
wrote:

> Hi Do,
>
> In DataStream you can always implement your own
> sampling function, hopefully without too much effort.
>
> Adding such functionality it to the API could be a good idea.
> But given that in sampling there is no “one-size-fits-all”
> solution (as not every use case needs random sampling and not
> all random samplers fit to all workloads), I am not sure if we
> should start adding different sampling operators.
>
> Thanks,
> Kostas
>
> > On Jul 9, 2016, at 5:43 PM, Greg Hogan  wrote:
> >
> > Hi Do,
> >
> > DataSet provides a stable @Public interface. DataSetUtils is marked
> > @PublicEvolving which is intended for public use, has stable behavior,
> but
> > method signatures may change. It's also good to limit DataSet to common
> > methods whereas the utility methods tend to be used for specific
> > applications.
> >
> > I don't have the pulse of streaming but this sounds like a useful feature
> > that could be added.
> >
> > Greg
> >
> > On Sat, Jul 9, 2016 at 10:47 AM, Le Quoc Do  wrote:
> >
> >> Hi all,
> >>
> >> I'm working on approximate computing using sampling techniques. I
> >> recognized that Flink supports the sample function for Dataset
> >> (org/apache/flink/api/java/utils/DataSetUtils.java). I'm just wondering
> why
> >> you didn't merge the function to org/apache/flink/api/java/DataSet.java
> >> since the sample function works as a transformation operator?
> >>
> >> The second question is that are you planning to support the sample
> >> function for DataStream (within windows) since I did not see it in
> >> DataStream code ?
> >>
> >> Thank you,
> >> Do
> >>
>
>


Re: Graph with stream of updates

2016-07-07 Thread Vasiliki Kalavri
Hi Milindu,

as far as I know, there is currently no way to query the state from outside
of Flink. That's a feature in the roadmap, but I'm not sure when it will be
provided. Maybe someone else can give us an update.
For now, you can either implement your queries inside you streaming job and
output a result from time to time, e.g. using a time window (or slice() in
gelly-stream) or you could periodically dump your graph to a database like
neo4j and run your queries there.

Cheers,
-Vasia.

On 6 July 2016 at 08:07, agentmilindu  wrote:

> Hi Vasia and Ankur,
>
> I have the same need as Ankur where I want to create a graph from Twitter
> stream and query it. I got the streaming graph built from Twitter stream
> thanks to your Gelly Stream. Now I want to run queries( different graph
> algorithms ) on this streaming graph time to time, while it keeps building
> over the time independently.  I'm new to Flink, please help me with this.
>
> Thank you,
> Milindu.
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Graph-with-stream-of-updates-tp5166p7837.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Flink stops deploying jobs on normal iteration

2016-07-07 Thread Vasiliki Kalavri
Hi Truong,

I guess the problem is that you want to use topDistance as a broadcast set
inside the iteration? If I understand correctly this is a dataset with a
single value, right? Could you maybe compute it with an aggregator instead?

-Vasia.

On 5 July 2016 at 21:48, Nguyen Xuan Truong  wrote:

> Hi Vasia,
>
> Thank you very much for your explanation :). When running with small
> maxIteration, the job graph that Flink executed was optimal. However, when
> maxIterations was large, Flink took very long time to generate the job
> graph. The actually time to execute the jobs was very fast but the time to
> optimize and schedule the jobs was slow.
>
> Regarding your suggestion, I didn't use iterate/iterateDelta because I
> need to access the intermediate results within an iteration (the
> topDistance in my pseudo-code). As you said before, Flink does not support
> that feature, so I wondered if you have a workround for interate or
> iterateDelta?
>
> Thanks,
> Truong
>
> On Tue, Jul 5, 2016 at 8:46 PM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Hi Truong,
>>
>> I'm afraid what you're experiencing is to be expected. Currently, for
>> loops do not perform well in Flink since there is no support for caching
>> intermediate results yet. This has been a quite often requested feature
>> lately, so maybe it will be added soon :)
>> Until then, I suggest you try implementing your logic using iterate or
>> iterateDelta.
>>
>> Cheers,
>> -Vasia.
>>
>> On 5 July 2016 at 17:11, Nguyen Xuan Truong  wrote:
>>
>>> Hi,
>>>
>>> I have a Flink program which is similar to Kmeans algorithm. I use
>>> normal iteration(for loop) because Flink iteration does not allow to
>>> compute the intermediate results(in this case the topDistance) within one
>>> iteration. The problem is that my program only runs when maxIteration is
>>> small. When the maxIterations is big, Flink jobs inside the forloop are not
>>> scheduled, deployed or executed. The program hangs forever without any
>>> exception, error or log message.
>>>
>>> I ran the program on both local and cluster environments, having the
>>> same issue. I tried with smaller inputs (points and seeds), having the same
>>> issue.
>>>
>>> Does anybody have an idea about what is the problem? (Maybe the forloop
>>> creates many Flink jobs?)
>>>
>>> Here is the pseudo-code of my program:
>>>
>>> DataSet[Point] points = env.readTextFile(inputPoints)
>>> DataSet[Point] seeds = env.readTextFile(inputSeeds)
>>> discardNumber: Int = 100
>>> maxIterations: Int = 20 // maxIteration = 30 will hang the program and
>>> no Flink job inside the forloop jobs is deployed)
>>>
>>> for(iteration <- 1 to maxIterations) {
>>>
>>>   val intermediateSeeds = points
>>> .map()
>>> .withBroadcastSet(seeds, "seeds")
>>>
>>>  //topDistance contains only only double value.
>>>   var topDistance = intermediateSeeds
>>> .mapPartition()
>>> .first(discardNumber)
>>> .groupBy()
>>> .reduceGroup()
>>>
>>>   val newSeeds = intermediateSeeds
>>> .map()
>>> .groupBy(0)
>>> .reduce ().withBroadcastSet(topDistance, "topDistance")
>>> .map()
>>>
>>>   seeds = newSeeds
>>> }
>>>
>>> val finalResult = seeds.collect()
>>>
>>>
>>> Thanks,
>>> Truong
>>>
>>
>>
>


Re: Flink stops deploying jobs on normal iteration

2016-07-05 Thread Vasiliki Kalavri
Hi Truong,

I'm afraid what you're experiencing is to be expected. Currently, for loops
do not perform well in Flink since there is no support for caching
intermediate results yet. This has been a quite often requested feature
lately, so maybe it will be added soon :)
Until then, I suggest you try implementing your logic using iterate or
iterateDelta.

Cheers,
-Vasia.

On 5 July 2016 at 17:11, Nguyen Xuan Truong  wrote:

> Hi,
>
> I have a Flink program which is similar to Kmeans algorithm. I use normal
> iteration(for loop) because Flink iteration does not allow to compute the
> intermediate results(in this case the topDistance) within one iteration.
> The problem is that my program only runs when maxIteration is small. When
> the maxIterations is big, Flink jobs inside the forloop are not scheduled,
> deployed or executed. The program hangs forever without any exception,
> error or log message.
>
> I ran the program on both local and cluster environments, having the same
> issue. I tried with smaller inputs (points and seeds), having the same
> issue.
>
> Does anybody have an idea about what is the problem? (Maybe the forloop
> creates many Flink jobs?)
>
> Here is the pseudo-code of my program:
>
> DataSet[Point] points = env.readTextFile(inputPoints)
> DataSet[Point] seeds = env.readTextFile(inputSeeds)
> discardNumber: Int = 100
> maxIterations: Int = 20 // maxIteration = 30 will hang the program and no
> Flink job inside the forloop jobs is deployed)
>
> for(iteration <- 1 to maxIterations) {
>
>   val intermediateSeeds = points
> .map()
> .withBroadcastSet(seeds, "seeds")
>
>  //topDistance contains only only double value.
>   var topDistance = intermediateSeeds
> .mapPartition()
> .first(discardNumber)
> .groupBy()
> .reduceGroup()
>
>   val newSeeds = intermediateSeeds
> .map()
> .groupBy(0)
> .reduce ().withBroadcastSet(topDistance, "topDistance")
> .map()
>
>   seeds = newSeeds
> }
>
> val finalResult = seeds.collect()
>
>
> Thanks,
> Truong
>


Re: Parameters inside an iteration?

2016-07-05 Thread Vasiliki Kalavri
Hi Christoph,

if I understand what you want to do correctly, making your RichMapFunction
a standalone class and passing your object to the constructor should work.

Cheers,
-Vasia.

On 5 July 2016 at 18:16, Boden, Christoph 
wrote:

> Dear Flink Community,
>
>
> is there a compact and efficient way to get parameters that are know at
> run-time, but not compile-time inside an iteration? I tried the following:
>
> >define an object with the parameters:
>
>
> object  iterationVariables{
> var numDataPoints = 1
> var lambda = 0.2
> var stepSize = 0.01
> }
>
> ​>update it in the driver before starting the iteration:
>
> iterationVariables.numDataPoints = numDP
> iterationVariables.lambda = l
> iterationVariable.stepSize = s
>
>
> >and then use it inside the iteration - accessing it accordingly​:
>
>
> val resultingWeights = weightVector.iterate(numIterations) {
>
> weights => {
>
> val computeGradient = new RichMapFunction[LabeledPoint,
> DenseVector[Double]] {
> var originalW: DenseVector[Double] = _
>
> override def open(parameters: Configuration): Unit = {
> originalW = getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR).get(0)
> }
>
> override def map(dp: LabeledPoint): DenseVector[Double] = {
> val learning_rate: Double = iterationVariables.s /
> Math.sqrt(getIterationRuntimeContext().getSuperstepNumber.toDouble)
>
> val sumElement = (dp.features.toDenseVector * (dp.label -
> mlutils.logisticFunction(originalW, (dp.features)))
> - (iterationVariables.lambda / iterationVariables.numDataPoints.toDouble)
> * originalW
> ) * learning_rate
>
> sumElement
> }
> }
> val newWeights : DataSet[DenseVector[Double]] =
> weights.union(data.map(computeGradient).withBroadcastSet(weights,
> WEIGHT_VECTOR).reduce{_ + _}).reduce{_ + _}
> newWeights
> }
>
>
> This did work perfectly fine in local mode, however once deployed on an
> actual cluster, iterationVariables inside the iteration actually returns
> the values set in the original object (e.g. numDataPoints = 1) and not
> the updated value that was set later in the driver, ultimately leading to
> wrong results in the computation.
>
> So once again, is there a way to get parameters the will only be known at
> run-time inside an iteration?
>
> Best regards,
> Christoph Boden
>
>
>
>


Re: Send to all in gelly scatter

2016-06-15 Thread Vasiliki Kalavri
I forgot the reference [1] :S
Here it is:

[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html#iteration-abstractions-comparison

On 15 June 2016 at 20:59, Vasiliki Kalavri 
wrote:

> Hi Alieh,
>
> you can send a message from any vertex to any other vertex if you know the
> vertex ID. In [1] you will find a table that compares the update logic and
> communication scope for all gelly iteration models.
> Bear in mind though, that sending a message from all vertices to all other
> vertices is a very expensive operation and will not scale to large graphs.
> You might need to find another way to express your computation.
>
> Cheers,
> -Vasia.
>
> On 15 June 2016 at 17:42, Ufuk Celebi  wrote:
>
>> I think you can only send messages to the directly connected vertices.
>>
>> On Tue, Jun 14, 2016 at 5:30 PM, Alieh Saeedi 
>> wrote:
>> > Hi
>> > Is it possible to send a message to all vertices in gelly scatter? How
>> the
>> > Ids of all vertices can be found out?
>> >
>> > Thanks in advance
>>
>
>


Re: Send to all in gelly scatter

2016-06-15 Thread Vasiliki Kalavri
Hi Alieh,

you can send a message from any vertex to any other vertex if you know the
vertex ID. In [1] you will find a table that compares the update logic and
communication scope for all gelly iteration models.
Bear in mind though, that sending a message from all vertices to all other
vertices is a very expensive operation and will not scale to large graphs.
You might need to find another way to express your computation.

Cheers,
-Vasia.

On 15 June 2016 at 17:42, Ufuk Celebi  wrote:

> I think you can only send messages to the directly connected vertices.
>
> On Tue, Jun 14, 2016 at 5:30 PM, Alieh Saeedi  wrote:
> > Hi
> > Is it possible to send a message to all vertices in gelly scatter? How
> the
> > Ids of all vertices can be found out?
> >
> > Thanks in advance
>


Re: Gelly Scatter/Gather - Vertex update

2016-06-15 Thread Vasiliki Kalavri
Hi Alieh,

the scatter-gather model is built on top of Flink delta iterations exactly
for the reason to allow de-activating vertices that do not need to
participate in the computation of a certain superstep. If you want all
vertices to participate in all iterations of scatter-gather, you can send
dummy messages to keep them active. However, the scatter-gather model might
not be the correct model for your use-case and you might want to look into
Flink bulk iteration operators instead [1].

-Vasia.

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#iteration-operators

On 14 June 2016 at 12:21, Alieh Saeedi  wrote:

> Hi everybody
> In Gelly scatter/gather when no message is sent to a vertex in one
> iteration, it will not enter the scatter function in next iteration. Why? I
> need all vertices enter the scatter function in all iterations, but some of
> them receive a message and will be updated.
>
> thanks in advance
>
>


Re: Gelly scatter/gather

2016-06-13 Thread Vasiliki Kalavri
Hi Alieh,

the VertexUpdateFunction and the MessagingFunction both have a method
"getSuperstepNumber()" which will give you the current iteration number.

-Vasia.

On 13 June 2016 at 18:06, Alieh Saeedi  wrote:

> Hi
> Is it possible to access iteration number in gelly scatter/gather?
>
> thanks in advance
> 
> 
>


Re: "Memory ran out" error when running connected components

2016-05-14 Thread Vasiliki Kalavri
Hey Rob,

On 13 May 2016 at 15:45, Arkay  wrote:

> Thanks for the link, I had experimented with those options, apart from
> taskmanager.memory.off-heap: true.  Turns out that allows it to run through
> happily!  I don't know if that is a peculiarity of a windows JVM, as I
> understand that setting is purely an efficiency improvement?
>

​Great to hear that you solved your problem!
​I'm not sure whether it's a windows peculiarity, maybe someone else could
clear this up.​



>
> For your first question, yes I have a number of steps that get scheduled
> around the same time in the job, its not really avoidable unless there are
> optimizer hints to tell the system to only run certain steps on their own?
>

​You could try setting the execution mode ​to BATCH/BATCH_FORCED with
"env.getConfig.setExecutionMode()".
It is typically more expensive than the default pipelined mode, but it
guarantees that no successive operations are run concurrently.



> I will try cutting the rest of the program out as a test however.
>
> Thanks very much for your help with this, and all your excellent work on
> Flink and Gelly :)
>
>
​:))​
​Let us know if you run into any more problems or have questions​.

​Cheers,
-V.​


Rob
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-ran-out-error-when-running-connected-components-tp6888p6904.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: "Memory ran out" error when running connected components

2016-05-13 Thread Vasiliki Kalavri
On 13 May 2016 at 14:28, Arkay  wrote:

> Hi Vasia,
>
> It seems to work OK up to about 50MB of input, and dies after that point.
> If i disable just this connected components step the rest of my program is
> happy with the full 1.5GB test dataset.  It seems to be specifically
> limited
> to GraphAlgorithms in my case.
>

​So your program has other ​steps before/after the connected components
algorithm?
Could it be that you have some expensive operation that competes for memory
with the hash table?



>
> Do you know what the units are when it is saying Partition memory: 8388608?
> If it is bytes then it sounds like its using around 256MB per hash table of
> 32 partitions (which is then multiplied by number of task slots i guess).
>

​Yes, that's bytes.​



> Can this number be configured do you know?  Perhaps the windows version of
> the JVM is defaulting it to a lower value than on Linux?
>

​By default, the hash table uses Fink's managed memory. That's 3.0GB in
your case (0.7 of the total memory by default).
You can change this fraction by setting the "taskmanager.memory.fraction"
in the configuration. See [1] for other managed memory options.

Hope this helps!
-Vasia.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#managed-memory



>
> Thanks,
> Rob
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-ran-out-error-when-running-connected-components-tp6888p6899.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: "Memory ran out" error when running connected components

2016-05-13 Thread Vasiliki Kalavri
Thanks for checking Rob! I don't see any reason for the job to fail with
this configuration and input size.
I have no experience running Flink on windows though, so I might be missing
something. Do you get a similar error with smaller inputs?

-Vasia.

On 13 May 2016 at 13:27, Arkay  wrote:

> Thanks Vasia,
>
> Apologies, yes by workers i mean I have set taskmanager.numberOfTaskSlots:
> 8
> and parallelism.default: 8 in flink-conf.yaml. I have also set
> taskmanager.heap.mb: 6500
>
> In the dashboard it is showing free memory as 5.64GB and Flink Managed
> Memory as 3.90GB.
>
> Thanks,
> Rob
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-ran-out-error-when-running-connected-components-tp6888p6895.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: "Memory ran out" error when running connected components

2016-05-13 Thread Vasiliki Kalavri
Hi Rob,


On 13 May 2016 at 11:22, Arkay  wrote:

> Hi to all,
>
> I’m aware there are a few threads on this, but I haven’t been able to solve
> an issue I am seeing and hoped someone can help.  I’m trying to run the
> following:
>
> val connectedNetwork = new org.apache.flink.api.scala.DataSet[Vertex[Long,
> Long]](
>   Graph.fromTuple2DataSet(inputEdges, vertexInitialiser, env)
> .run(new ConnectedComponents[Long, NullValue](100)))
>
> And hitting the error:
>
> java.lang.RuntimeException: Memory ran out. numPartitions: 32 minPartition:
> 8 maxPartition: 8 number of overflow segments: 122 bucketSize: 206 Overall
> memory: 19365888 Partition memory: 8388608
>  at
>
> org.apache.flink.runtime.operators.hash.CompactingHashTable.getNextBuffer(CompactingHashTable.java:753)
>  at
>
> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertBucketEntryFromStart(CompactingHashTable.java:546)
>  at
>
> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:423)
>  at
>
> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
>  at
>
> org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212)
>  at
>
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273)
>  at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>  at java.lang.Thread.run(Unknown Source)
>
> I’m running Flink 1.0.3 on windows 10 using start-local.bat.  I have Xmx
> set
> to 6500MB, 8 workers, parallelism 8 and other memory settings left at
> default.
>

The ​start-local script will start a single JobManager and TaskManager.
​What do you mean by 8 workers? Have you set the numberOfTaskSlots to 8? To
give all available memory to your TaskManager, you should set the
"taskmanager.heap.mb" configuration option in flink-conf.yaml. Can you open
the Flink dashboard at http://localhost:8081/ and check the configuration
of your taskmanager?

​Cheers,
-Vasia.​


> The inputEdges dataset contains 141MB of Long,Long pairs (which is around 6
> million edges).  ParentID is unique and always negative, ChildID is
> non-unique and always positive (simulating a bipartite graph)
>
> An example few rows:
> -91498683401,1738
> -135344401,5370
> -100260517801,7970
> -154352186001,12311
> -160265532002,12826
>
> The vast majority of the childIds are actually unique, and the most popular
> ID only occurs 10 times.
>
> VertexInitialiser just sets the vertex value to the id.
>
> Hopefully this is just a memory setting I’m not seeing for the hashTable as
> it dies almost instantly,  I don’t think it gets very far into the dataset.
> I understand that the CompactingHashTable cannot spill, but I’d be
> surprised
> if it needed to at these low volumes.
>
> Many thanks for any help!
>
> Rob
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-ran-out-error-when-running-connected-components-tp6888.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Scatter-Gather Iteration aggregators

2016-05-13 Thread Vasiliki Kalavri
Hi Lydia,

aggregators are automatically reset at the beginning of each iteration. As
far as I know, the reset() method is not supposed to be called from user
code. Also, in the code you pasted, you use "aggregator.getAggregate()".
Please, use the "getPreviousIterationAggregate()" method as I wrote above,
otherwise you won't get the correct value.

Cheers,
-Vasia.

On 13 May 2016 at 11:03, Lydia Ickler  wrote:

> Hi Vasia,
>
> okay I understand now :)
> So it works fine if I want to collect the sum of values.
> But what if I need to reset the DoubleSumAggregator back to 0 in order to
> then set it to a new value to save the absolute maximum?
> Please have a look at the code above.
>
> Any idea why it is not working?
>
>
> public static class VertexDistanceUpdater extends 
> VertexUpdateFunction {
>
> DoubleSumAggregator aggregator = new DoubleSumAggregator();
>
> public void preSuperstep() {
> // retrieve the Aggregator
> aggregator = getIterationAggregator("sumAggregator");
> }
>
> public void updateVertex(Vertex vertex, 
> MessageIterator inMessages) {
> double sum = 0;
> for (double msg : inMessages) {
> sum = sum + (msg);
> }
>
> if((Math.abs(sum) > Math.abs(aggregator.getAggregate().getValue({
>
> aggregator.reset();
>     aggregator.aggregate(sum);
>
> }
> setNewVertexValue(sum);
> }
> }
>
>
>
>
> Am 13.05.2016 um 09:25 schrieb Vasiliki Kalavri  >:
>
> Hi Lydia,
>
> an iteration aggregator combines all aggregates globally once per
> superstep and makes them available in the *next* superstep.
> Within each scatter-gather iteration, one MessagingFunction (scatter
> phase) and one VertexUpdateFunction (gather phase) are executed. Thus, if
> you set an aggregate value within one of those, the value will be available
> in the next superstep. You can retrieve it calling
> the getPreviousIterationAggregate() method.
> Let me know if that clears things up!
>
> -Vasia.
>
> On 13 May 2016 at 08:57, Lydia Ickler  wrote:
>
>> Hi Vasia,
>>
>> yes, but only independently within each Function or not?
>>
>> If I set the aggregator in VertexUpdateFunction then the newly set value
>> is not visible in the MessageFunction.
>> Or am I doing something wrong? I would like to have a shared aggregator
>> to normalize vertices.
>>
>>
>> Am 13.05.2016 um 08:04 schrieb Vasiliki Kalavri <
>> vasilikikala...@gmail.com>:
>>
>> Hi Lydia,
>>
>> registered aggregators through the ScatterGatherConfiguration are
>> accessible both in the VertexUpdateFunction and in the MessageFunction.
>>
>> Cheers,
>> -Vasia.
>>
>> On 12 May 2016 at 20:08, Lydia Ickler  wrote:
>>
>>> Hi,
>>>
>>> I have a question regarding the Aggregators of a Scatter-Gather
>>> Iteration.
>>> Is it possible to have a global aggregator that is accessible in 
>>> VertexUpdateFunction()
>>> and MessagingFunction() at the same time?
>>>
>>> Thanks in advance,
>>> Lydia
>>>
>>
>>
>>
>
>


Re: Scatter-Gather Iteration aggregators

2016-05-13 Thread Vasiliki Kalavri
Hi Lydia,

an iteration aggregator combines all aggregates globally once per superstep
and makes them available in the *next* superstep.
Within each scatter-gather iteration, one MessagingFunction (scatter phase)
and one VertexUpdateFunction (gather phase) are executed. Thus, if you set
an aggregate value within one of those, the value will be available in the
next superstep. You can retrieve it calling
the getPreviousIterationAggregate() method.
Let me know if that clears things up!

-Vasia.

On 13 May 2016 at 08:57, Lydia Ickler  wrote:

> Hi Vasia,
>
> yes, but only independently within each Function or not?
>
> If I set the aggregator in VertexUpdateFunction then the newly set value
> is not visible in the MessageFunction.
> Or am I doing something wrong? I would like to have a shared aggregator
> to normalize vertices.
>
>
> Am 13.05.2016 um 08:04 schrieb Vasiliki Kalavri  >:
>
> Hi Lydia,
>
> registered aggregators through the ScatterGatherConfiguration are
> accessible both in the VertexUpdateFunction and in the MessageFunction.
>
> Cheers,
> -Vasia.
>
> On 12 May 2016 at 20:08, Lydia Ickler  wrote:
>
>> Hi,
>>
>> I have a question regarding the Aggregators of a Scatter-Gather
>> Iteration.
>> Is it possible to have a global aggregator that is accessible in 
>> VertexUpdateFunction()
>> and MessagingFunction() at the same time?
>>
>> Thanks in advance,
>> Lydia
>>
>
>
>


Re: Scatter-Gather Iteration aggregators

2016-05-12 Thread Vasiliki Kalavri
Hi Lydia,

registered aggregators through the ScatterGatherConfiguration are
accessible both in the VertexUpdateFunction and in the MessageFunction.

Cheers,
-Vasia.

On 12 May 2016 at 20:08, Lydia Ickler  wrote:

> Hi,
>
> I have a question regarding the Aggregators of a Scatter-Gather Iteration.
> Is it possible to have a global aggregator that is accessible in 
> VertexUpdateFunction()
> and MessagingFunction() at the same time?
>
> Thanks in advance,
> Lydia
>


Re: normalize vertex values

2016-05-12 Thread Vasiliki Kalavri
Hi Lydia,

there is no dedicated Gelly API method that performs normalization. If you
know the max value, then a mapVertices() would suffice. Otherwise, you can
get the Dataset of vertices with getVertices() and apply any kind of
operation supported by the Dataset API on it.

Best,
-Vasia.
On May 12, 2016 10:31 AM, "Lydia Ickler"  wrote:

> Hi all,
>
> If I have a Graph g: Graph g
> and I would like to normalize all vertex values by the absolute max of all
> vertex values -> what API function would I choose?
>
> Thanks in advance!
> Lydia


Re: Bug while using Table API

2016-05-12 Thread Vasiliki Kalavri
Good to know :)

On 12 May 2016 at 11:16, Simone Robutti 
wrote:

> Ok, I tested it and it works on the same example. :)
>
> 2016-05-11 12:25 GMT+02:00 Vasiliki Kalavri :
>
>> Hi Simone,
>>
>> Fabian has pushed a fix for the streaming TableSources that removed the
>> Calcite Stream rules [1].
>> The reported error does not appear anymore with the current master. Could
>> you please also give it a try and verify that it works for you?
>>
>> Thanks,
>> -Vasia.
>>
>> [1]:
>> https://github.com/apache/flink/commit/7ed07933d2dd3cf41948287dc8fd79dbef902311
>>
>> On 4 May 2016 at 17:33, Vasiliki Kalavri 
>> wrote:
>>
>>> Thanks Simone! I've managed to reproduce the error. I'll try to figure
>>> out what's wrong and I'll keep you updated.
>>>
>>> -Vasia.
>>> On May 4, 2016 3:25 PM, "Simone Robutti" 
>>> wrote:
>>>
>>>> Here is the code:
>>>>
>>>> package org.example
>>>>
>>>> import org.apache.flink.api.scala._
>>>> import org.apache.flink.api.table.TableEnvironment
>>>>
>>>> object Job {
>>>>   def main(args: Array[String]) {
>>>> // set up the execution environment
>>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>>>
>>>>
>>>> val input = env.fromElements(WC("hello", 1), WC("hello", 1),
>>>> WC("ciao", 1))
>>>> val expr = tEnv.fromDataSet(input)
>>>> val result = expr
>>>>   .groupBy("word")
>>>>   .select("word , count.sum as count")
>>>> tEnv.toDataSet[WC](result).print()
>>>>
>>>> env.execute("Flink Scala API Skeleton")
>>>>   }
>>>> }
>>>>
>>>> case class WC(word:String,count:Int)
>>>>
>>>>
>>>>
>>
>


Re: Bug while using Table API

2016-05-11 Thread Vasiliki Kalavri
Hi Simone,

Fabian has pushed a fix for the streaming TableSources that removed the
Calcite Stream rules [1].
The reported error does not appear anymore with the current master. Could
you please also give it a try and verify that it works for you?

Thanks,
-Vasia.

[1]:
https://github.com/apache/flink/commit/7ed07933d2dd3cf41948287dc8fd79dbef902311

On 4 May 2016 at 17:33, Vasiliki Kalavri  wrote:

> Thanks Simone! I've managed to reproduce the error. I'll try to figure out
> what's wrong and I'll keep you updated.
>
> -Vasia.
> On May 4, 2016 3:25 PM, "Simone Robutti" 
> wrote:
>
>> Here is the code:
>>
>> package org.example
>>
>> import org.apache.flink.api.scala._
>> import org.apache.flink.api.table.TableEnvironment
>>
>> object Job {
>>   def main(args: Array[String]) {
>> // set up the execution environment
>> val env = ExecutionEnvironment.getExecutionEnvironment
>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>
>>
>> val input = env.fromElements(WC("hello", 1), WC("hello", 1),
>> WC("ciao", 1))
>> val expr = tEnv.fromDataSet(input)
>> val result = expr
>>   .groupBy("word")
>>   .select("word , count.sum as count")
>> tEnv.toDataSet[WC](result).print()
>>
>> env.execute("Flink Scala API Skeleton")
>>   }
>> }
>>
>> case class WC(word:String,count:Int)
>>
>>
>>


Re: Bug while using Table API

2016-05-04 Thread Vasiliki Kalavri
Thanks Simone! I've managed to reproduce the error. I'll try to figure out
what's wrong and I'll keep you updated.

-Vasia.
On May 4, 2016 3:25 PM, "Simone Robutti" 
wrote:

> Here is the code:
>
> package org.example
>
> import org.apache.flink.api.scala._
> import org.apache.flink.api.table.TableEnvironment
>
> object Job {
>   def main(args: Array[String]) {
> // set up the execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
>
> val input = env.fromElements(WC("hello", 1), WC("hello", 1),
> WC("ciao", 1))
> val expr = tEnv.fromDataSet(input)
> val result = expr
>   .groupBy("word")
>   .select("word , count.sum as count")
> tEnv.toDataSet[WC](result).print()
>
> env.execute("Flink Scala API Skeleton")
>   }
> }
>
> case class WC(word:String,count:Int)
>
>
>


Re: Bug while using Table API

2016-05-04 Thread Vasiliki Kalavri
Hi Simone,

I tried reproducing your problem with no luck.
I ran the WordCountTable example using sbt quickstart with Flink
1.1-SNAPSHOT and Scala 2.10 and it worked fine.
Can you maybe post the code you tried?

Thanks,
-Vasia.

On 4 May 2016 at 11:20, Simone Robutti  wrote:

> Hello,
>
> while trying my first job using the Table API I got blocked by this error:
>
> Exception in thread "main" java.lang.NoSuchFieldError: RULES
> at
> org.apache.flink.api.table.plan.rules.FlinkRuleSets$.(FlinkRuleSets.scala:148)
> at
> org.apache.flink.api.table.plan.rules.FlinkRuleSets$.(FlinkRuleSets.scala)
> at
> org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:212)
> at
> org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141)
> at org.example.Job$.main(Job.scala:64)
> at org.example.Job.main(Job.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
>
> This happens with every Job. Right now I'm trying with the WordCount
> example from the documentation. I'm using SBT quickstart project with
> version 1.1-SNAPSHOT. I can actually access the "StreamRules.RULES" field
> directly so it is there, but if I try to access DATASTREAM_OPT_RULES the
> same error as the one reported is raised.
>
> I tried both with Scala 2.11 and Scala 2.10.
>
>
>


Re: aggregation problem

2016-04-28 Thread Vasiliki Kalavri
Hi Riccardo,

can you please be a bit more specific? What do you mean by "it didn't
work"? Did it crash? Did it give you a wrong value? Something else?

-Vasia.

On 28 April 2016 at 16:52, Riccardo Diomedi 
wrote:

> Hi everybody
>
> In a DeltaIteration I have a DataSet>> where, at a
> certain point of the iteration, i need to count the total number of tuples
> and the total number of elements in the HashSet of each tuple, and then
> send both value to the ConvergenceCriterion function.
>
> Example:
>
> this is the content of my DataSet:
> (*1*,2,*[2,3]*)
> (*2*,1,*[3,4]*)
> (*3*,2,[*4,5]*)
>
> i should have:
> first count: *3* (1,2,3)
> second count: *4* (2,3,4,5)
>
> i tried to iterate the dataset through a flatMap and exploit so an
> aggregator, putting an HashSet into it(Aggregator), but it didn’t work!
>
> Do you have any suggestion??
>
> thanks
>
> Riccardo
>


Re: Gelly CommunityDetection in scala example

2016-04-27 Thread Vasiliki Kalavri
Hi Trevor,

note that the community detection algorithm returns a new graph where the
vertex values correspond to the computed communities. Also, note that the
current implementation expects a graph with java.lang.Long vertex values
and java.lang.Double edge values.

The following should work:

import java.lang.Long
import java.lang.Double

...

val graph: Graph[Long, Long, Double] = ... // create your graph
val resultGraph = graph.run(new CommunityDetection[Long](30, 0.5))
resultGraph.getVertices.print()


Cheers,
-Vasia.

On 27 April 2016 at 17:41, Suneel Marthi  wrote:

> Recall facing a similar issue while trying to contribute a gelly-scala
> example to flink-training.
>
> See
> https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/scala/com/dataartisans/flinktraining/exercises/gelly_scala/PageRankWithEdgeWeights.scala
>
> On Wed, Apr 27, 2016 at 11:35 AM, Trevor Grant 
> wrote:
>
>> The following example in the scala shell worked as expected:
>>
>> import org.apache.flink.graph.library.LabelPropagation
>>
>> val verticesWithCommunity = graph.run(new LabelPropagation(30))
>>
>> // print the result
>> verticesWithCommunity.print
>>
>>
>> I tried to extend the example to use CommunityDetection:
>>
>> import org.apache.flink.graph.library.CommunityDetection
>>
>> val verticesWithCommunity = graph.run(new CommunityDetection(30, 0.5))
>>
>> // print the result
>> verticesWithCommunity.print
>>
>>
>> And meant the following error:
>> error: polymorphic expression cannot be instantiated to expected type;
>> found : [K]org.apache.flink.graph.library.CommunityDetection[K]
>> required: org.apache.flink.graph.GraphAlgorithm[Long,String,Double,?]
>> val verticesWithCommunity = graph.run(new CommunityDetection(30, 0.5))
>> ^
>>
>> I haven't been able to come up with a hack to make this work. Any
>> advice/bug?
>>
>> I invtestigated the code base a little, seems to be an issue with what
>> Graph.run expects to see vs. what LabelPropagation returns vs. what
>> CommunityDetection returns.
>>
>>
>>
>> Trevor Grant
>> Data Scientist
>> https://github.com/rawkintrevo
>> http://stackexchange.com/users/3002022/rawkintrevo
>> http://trevorgrant.org
>>
>> *"Fortunate is he, who is able to know the causes of things."  -Virgil*
>>
>>
>


Re: Job hangs

2016-04-27 Thread Vasiliki Kalavri
Hi Timur,

I've previously seen large batch jobs hang because of join deadlocks. We
should have fixed those problems, but we might have missed some corner
case. Did you check whether there was any cpu activity when the job hangs?
Can you try running htop on the taskmanager machines and see if they're
idle?

Cheers,
-Vasia.

On 27 April 2016 at 02:48, Timur Fayruzov  wrote:

> Robert, Ufuk, logs, execution plan and a screenshot of the console are in
> the archive:
> https://www.dropbox.com/s/68gyl6f3rdzn7o1/debug-stuck.tar.gz?dl=0
>
> Note that when I looked in the backpressure view I saw back pressure
> 'high' on following paths:
>
> Input->code_line:123,124->map->join
> Input->code_line:134,135->map->join
> Input->code_line:121->map->join
>
> Unfortunately, I was not able to take thread dumps nor heap dumps (neither
> kill -3, jstack nor jmap worked, some Amazon AMI problem I assume).
>
> Hope that helps.
>
> Please, let me know if I can assist you in any way. Otherwise, I probably
> would not be actively looking at this problem.
>
> Thanks,
> Timur
>
>
> On Tue, Apr 26, 2016 at 8:11 AM, Ufuk Celebi  wrote:
>
>> Can you please further provide the execution plan via
>>
>> env.getExecutionPlan()
>>
>>
>>
>> On Tue, Apr 26, 2016 at 4:23 PM, Timur Fayruzov
>>  wrote:
>> > Hello Robert,
>> >
>> > I observed progress for 2 hours(meaning numbers change on dashboard),
>> and
>> > then I waited for 2 hours more. I'm sure it had to spill at some point,
>> but
>> > I figured 2h is enough time.
>> >
>> > Thanks,
>> > Timur
>> >
>> > On Apr 26, 2016 1:35 AM, "Robert Metzger"  wrote:
>> >>
>> >> Hi Timur,
>> >>
>> >> thank you for sharing the source code of your job. That is helpful!
>> >> Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is
>> much
>> >> more IO heavy with the larger input data because all the joins start
>> >> spilling?
>> >> Our monitoring, in particular for batch jobs is really not very
>> advanced..
>> >> If we had some monitoring showing the spill status, we would maybe see
>> that
>> >> the job is still running.
>> >>
>> >> How long did you wait until you declared the job hanging?
>> >>
>> >> Regards,
>> >> Robert
>> >>
>> >>
>> >> On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi  wrote:
>> >>>
>> >>> No.
>> >>>
>> >>> If you run on YARN, the YARN logs are the relevant ones for the
>> >>> JobManager and TaskManager. The client log submitting the job should
>> >>> be found in /log.
>> >>>
>> >>> – Ufuk
>> >>>
>> >>> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
>> >>>  wrote:
>> >>> > I will do it my tomorrow. Logs don't show anything unusual. Are
>> there
>> >>> > any
>> >>> > logs besides what's in flink/log and yarn container logs?
>> >>> >
>> >>> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi"  wrote:
>> >>> >
>> >>> > Hey Timur,
>> >>> >
>> >>> > is it possible to connect to the VMs and get stack traces of the
>> Flink
>> >>> > processes as well?
>> >>> >
>> >>> > We can first have a look at the logs, but the stack traces will be
>> >>> > helpful if we can't figure out what the issue is.
>> >>> >
>> >>> > – Ufuk
>> >>> >
>> >>> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann <
>> trohrm...@apache.org>
>> >>> > wrote:
>> >>> >> Could you share the logs with us, Timur? That would be very
>> helpful.
>> >>> >>
>> >>> >> Cheers,
>> >>> >> Till
>> >>> >>
>> >>> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" <
>> timur.fairu...@gmail.com>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Hello,
>> >>> >>>
>> >>> >>> Now I'm at the stage where my job seem to completely hang. Source
>> >>> >>> code is
>> >>> >>> attached (it won't compile but I think gives a very good idea of
>> what
>> >>> >>> happens). Unfortunately I can't provide the datasets. Most of them
>> >>> >>> are
>> >>> >>> about
>> >>> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks
>> 6GB
>> >>> >>> memory
>> >>> >>> for each.
>> >>> >>>
>> >>> >>> It was working for smaller input sizes. Any idea on what I can do
>> >>> >>> differently is appreciated.
>> >>> >>>
>> >>> >>> Thans,
>> >>> >>> Timur
>> >>
>> >>
>> >
>>
>
>


Re: About flink stream table API

2016-04-26 Thread Vasiliki Kalavri
Hello,

the stream table API is currently under heavy development. So far, we
support selection, filtering, and union operations. For these operations we
use the stream SQL syntax of Apache Calcite [1]. This is as simple as
adding the "STREAM" keyword.

Registering a datastream table and running a stream SQL query works the
same way as for Datasets.
Here's a filtering example in Scala:

--
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val dataStream = env.addSource(...)
val t = dataStream.toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)

val sqlQuery = "SELECT STREAM * FROM MyTable WHERE a = 3"
val result = tEnv.sql(sqlQuery).toDataStream[Row]
---

You can find more details on our plans to support windows and aggregations
in the design document [2]. Feedback and ideas are very welcome!

Cheers,
-Vasia.

[1]: https://calcite.apache.org/docs/stream.html
[2]:
https://docs.google.com/document/d/1TLayJNOTBle_-m1rQfgA6Ouj1oYsfqRjPcp1h2TVqdI/edit?usp=sharing

On 26 April 2016 at 11:21, Zhangrucong  wrote:

> Hello:
>
>  I want to learn the flink stream API. The stream sql is the same with
> calcite?
>
>  In the flowing link, the examples of table api are dataset, where I
> can see the detail introduction of streaming table API.
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html
>
>
>
>  Thanks in advance!
>
>
>
>
>


Re: Intermediate solution set of delta iteration

2016-03-23 Thread Vasiliki Kalavri
Hi Mengqi,

if what you are trying to do is output the solution set of every iteration,
before the iteration has finished, then that is not possible.
i.e. you can not output the solution set to a sink or another operator
during the iteration.

However, you can add elements to the solution set and grow the dataset as
the iteration proceeds and retrieve it after it is finished.
Currently, you cannot union with the solution set, but a CoGroup could give
you the desired result.

Alternatively, you might want to look at bulk iterations (Iterate
abstraction) [1]. With these you can produce a completely new dataset after
every iteration.

I hope that helps!
-Vasia.

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html

On 20 March 2016 at 18:12, Mengqi Yang  wrote:

> Hi all,
>
> Are there any approaches here I could get intermediate solution sets from
> every delta iteration? I tried union but the compiler gave me the error:
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: Error: The only
> operations allowed on the solution set are Join and CoGroup.
>
>
> Best,
> Mengqi
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Intermediate-solution-set-of-delta-iteration-tp5656.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Memory ran out PageRank

2016-03-14 Thread Vasiliki Kalavri
Hi Ovidiu,

this option won't fix the problem if your system doesn't have enough memory
:)
It only defines whether the solution set is kept in managed memory or not.
For more iteration configuration options, take a look at the Gelly
documentation [1].

-Vasia.

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html#configuring-a-scatter-gather-iteration

On 14 March 2016 at 17:55, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Thank you for this alternative.
> I don’t understand how the workaround will fix this on systems with
> limited memory and maybe larger graph.
>
> Running Connected Components on the same graph gives the same problem.
>
> IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED
> java.lang.RuntimeException: Memory ran out. Compaction failed.
> numPartitions: 32 minPartition: 31 maxPartition: 32 number of overflow
> segments: 417 bucketSize: 827 Overall memory: 149159936 Partition memory:
> 65601536 Message: Index: 32, Size: 31
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469)
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414)
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212)
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> Best,
> Ovidiu
>
> On 14 Mar 2016, at 17:36, Martin Junghanns 
> wrote:
>
> Hi
>
> I think this is the same issue we had before on the list [1]. Stephan
> recommended the following workaround:
>
> A possible workaround is to use the option "setSolutionSetUnmanaged(true)"
> on the iteration. That will eliminate the fragmentation issue, at least.
>
>
> Unfortunately, you cannot set this when using graph.run(new PageRank(...))
>
> I created a Gist which shows you how to set this using PageRank
>
> https://gist.github.com/s1ck/801a8ef97ce374b358df
>
> Please let us know if it worked out for you.
>
> Cheers,
> Martin
>
> [1]
> http://mail-archives.apache.org/mod_mbox/flink-user/201508.mbox/%3CCAELUF_ByPAB%2BPXWLemPzRH%3D-awATeSz4sGz4v9TmnvFku3%3Dx3A%40mail.gmail.com%3E
>
> On 14.03.2016 16:55, Ovidiu-Cristian MARCU wrote:
>
> Hi,
>
> While running PageRank on a synthetic graph I run into this problem:
> Any advice on how should I proceed to overcome this memory issue?
>
> IterationHead(Vertex-centric iteration
> (org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 |
> org.apache.flink.graph.library.PageRank$RankMesseng$
> java.lang.RuntimeException: Memory ran out. Compaction failed.
> numPartitions: 32 minPartition: 24 maxPartition: 25 number of overflow
> segments: 328 bucketSize: 638 Overall memory: 115539968 Partition memory:
> 50659328 Message: Index: 25, Size: 24
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469)
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414)
> at
> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212)
> at
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> Thanks!
>
> Best,
> Ovidiu
>
>
>


Re: time spent for iteration

2016-03-09 Thread Vasiliki Kalavri
I think it would be useful to allow for easier retrieval of this
information.
Wouldn't it make sense to expose this to the web UI for example?
We actually had a discussion about this some time ago [1].

-Vasia.

[1]: https://issues.apache.org/jira/browse/FLINK-1759

On 9 March 2016 at 14:37, Gábor Gévay  wrote:

> Hello,
>
> If you increase the log level, you can see each step of the iteration
> separately in the log, with timestamps.
>
> Best,
> Gábor
>
>
>
>
>
> 2016-03-09 14:04 GMT+01:00 Riccardo Diomedi  >:
> > Is it possible to add timer for the time spent for iteration when
> iterate operator or the delta iterate operator is performed?
> >
> > thanks
> >
> > Riccardo
>


Re: Graph with stream of updates

2016-02-26 Thread Vasiliki Kalavri
Hi Ankur,

you can have custom state in your Flink operators, including a graph. There
is no graph state abstraction provided at the moment, but it shouldn't be
too hard for you to implement your own.
If your use-case only requires processing edge additions only, then you
might want to take a look into gelly-stream [1]. Is it a single-pass graph
streaming API, processing edge additions, and operating on graph summaries.

Cheers,
-Vasia.

[1]: https://github.com/vasia/gelly-streaming

On 26 February 2016 at 14:59, Ankur Sharma 
wrote:

> Hello,
>
> Thanks for reply.
>
> I want to create a graph from stream and query it. You got it right.
>
> Stream may be edges that are getting added or removed from the graph.
>
> Is there a way to create a empty global graph that can be transformed
> using a stream of updates?
>
> Best,
> *Ankur Sharma*
> *3.15 E1.1 Universität des Saarlandes*
> *66123, Saarbrücken Germany*
> *Email: ankur.sha...@mpi-inf.mpg.de  *
> *an...@stud.uni-saarland.de *
>
> On 26 Feb 2016, at 14:55, Robert Metzger  wrote:
>
> Hi Ankur,
>
> Can you provide a bit more information on what you are trying to achieve?
>
> Do you want to keep a graph build from an stream of events within Flink
> and query that?
> Or you you want to change the dataflow graph of Flink while a job is
> running?
>
> Regards,
> Robert
>
>
> On Thu, Feb 25, 2016 at 11:19 PM, Ankur Sharma  > wrote:
>
>> Hello,
>>
>> Is it possible to create and update graph with streaming edge and vertex
>> data in flink?
>>
>> Best,
>> *Ankur Sharma*
>> *3.15 E1.1 Universität des Saarlandes*
>> *66123, Saarbrücken Germany*
>> *Email: ankur.sha...@mpi-inf.mpg.de  *
>> *an...@stud.uni-saarland.de *
>>
>>
>
>


Re: Machine Learning on Apache Fink

2016-01-09 Thread Vasiliki Kalavri
Hi Ashutosh,

Flink has a Machine Learning library, Flink-ML. You can find more
information and examples the documentation [1].
The code is currently in the flink-staging repository. There is also
material on Slideshare that you might find interesting [2, 3, 4].

I hope this helps!
-Vasia.

[1]: https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/
[2]:
http://www.slideshare.net/tillrohrmann/machine-learning-with-apache-flink
[3]:
http://www.slideshare.net/TheodorosVasiloudis/flinkml-large-scale-machine-learning-with-apache-flink
[4]:
http://www.slideshare.net/tillrohrmann/computing-recommendations-at-extreme-scale-with-apache-flink-buzzwords-2015-48879155

On 9 January 2016 at 12:46, Ashutosh Kumar 
wrote:

> I see lot of study materials and even book available for ml on spark.
> Spark seems to be more mature for analytics related work as of now. Please
> correct me if I am wrong. As I have already built my collection and data
> pre processing layers on flink , I want to use Flink for analytics as well.
> Thanks in advance.
>
>
> Ashutosh
>
> On Sat, Jan 9, 2016 at 3:32 PM, Ashutosh Kumar  > wrote:
>
>> I am looking for some study material and examples on machine learning .
>> Are classification , recommendation and clustering libraries available ?
>> What is the timeline for Flink as backend for Mahout? I am building a meta
>> data driven framework over Flink . While building data collection and
>> transformation modules was cool , I am struggling since I started analytics
>> module. Thanks in advance.
>> Ashutosh
>>
>
>


Re: 2015: A Year in Review for Apache Flink

2015-12-31 Thread Vasiliki Kalavri
Happy new year everyone!
Looking forward to all the great things the Apache Flink community will
accomplish in 2016 :))

Greetings from snowy Greece!
-Vasia.

On 31 December 2015 at 04:22, Henry Saputra  wrote:

> Dear All,
>
> It is almost end of 2015 and it has been busy and great year for Apache
> Flink =)
>
> Robert Metzger had posted great blog summarizing Apache Flink grow for
> this year:
>
>   https://flink.apache.org/news/2015/12/18/a-year-in-review.html
>
> Happy New Year everyone and thanks for being part of this great community!
>
>
> Thanks,
>
> - Henry
>


Re: store and retrieve Graph object

2015-11-25 Thread Vasiliki Kalavri
Good to know :)

On 25 November 2015 at 21:44, Stefanos Antaris 
wrote:

> Hi,
>
> It works fine using this approach.
>
> Thanks,
> Stefanos
>
> On 25 Nov 2015, at 20:32, Vasiliki Kalavri 
> wrote:
>
> Hey,
>
> you can preprocess your data, create the vertices and store them to a
> file, like you would store any other Flink DataSet, e.g. with writeAsText.
>
> Then, you can create the graph by reading 2 datasets, like this:
>
> DataSet vertices = env.readTextFile("/path/to/vertices/")... // or
> your custom reading logic
> DataSet edges = ...
>
> Graph graph = Graph.fromDataSet(vertices, edges, env);
>
> Is this what you're looking for?
>
> Also, note that if you have a very large graph, you should avoid using
> collect() and fromCollection().
>
> -Vasia.
>
> On 25 November 2015 at 18:03, Stefanos Antaris  > wrote:
>
>> Hi Vasia,
>>
>> my graph object is the following:
>>
>> Graph graph = Graph.fromCollection(
>> edgeList.collect(), env);
>>
>> The vertex is a POJO not the value. So the problem is how could i store
>> and retrieve the vertex list?
>>
>> Thanks,
>> Stefanos
>>
>> On 25 Nov 2015, at 18:16, Vasiliki Kalavri 
>> wrote:
>>
>> Hi Stefane,
>>
>> let me know if I understand the problem correctly. The vertex values are
>> POJOs that you're somehow inferring from the edge list and this value
>> creation is what takes a lot of time? Since a graph is just a set of 2
>> datasets (vertices and edges), you could store the values to disk and have
>> a custom input format to read them into datasets. Would that work for you?
>>
>> -Vasia.
>>
>> On 25 November 2015 at 15:09, Stefanos Antaris <
>> antaris.stefa...@gmail.com> wrote:
>>
>>> Hi to all,
>>>
>>> i am working on a project with Gelly and i need to create a graph with
>>> billions of nodes. Although i have the edge list, the node in the Graph
>>> needs to be a POJO object, the construction of which takes long time in
>>> order to finally create the final graph. Is it possible to store the Graph
>>> object as a file and retrieve it whenever i want to run an experiment?
>>>
>>> Thanks,
>>> Stefanos
>>
>>
>>
>>
>
>


Re: store and retrieve Graph object

2015-11-25 Thread Vasiliki Kalavri
Hey,

you can preprocess your data, create the vertices and store them to a file,
like you would store any other Flink DataSet, e.g. with writeAsText.

Then, you can create the graph by reading 2 datasets, like this:

DataSet vertices = env.readTextFile("/path/to/vertices/")... // or
your custom reading logic
DataSet edges = ...

Graph graph = Graph.fromDataSet(vertices, edges, env);

Is this what you're looking for?

Also, note that if you have a very large graph, you should avoid using
collect() and fromCollection().

-Vasia.

On 25 November 2015 at 18:03, Stefanos Antaris 
wrote:

> Hi Vasia,
>
> my graph object is the following:
>
> Graph graph = Graph.fromCollection(
> edgeList.collect(), env);
>
> The vertex is a POJO not the value. So the problem is how could i store
> and retrieve the vertex list?
>
> Thanks,
> Stefanos
>
> On 25 Nov 2015, at 18:16, Vasiliki Kalavri 
> wrote:
>
> Hi Stefane,
>
> let me know if I understand the problem correctly. The vertex values are
> POJOs that you're somehow inferring from the edge list and this value
> creation is what takes a lot of time? Since a graph is just a set of 2
> datasets (vertices and edges), you could store the values to disk and have
> a custom input format to read them into datasets. Would that work for you?
>
> -Vasia.
>
> On 25 November 2015 at 15:09, Stefanos Antaris  > wrote:
>
>> Hi to all,
>>
>> i am working on a project with Gelly and i need to create a graph with
>> billions of nodes. Although i have the edge list, the node in the Graph
>> needs to be a POJO object, the construction of which takes long time in
>> order to finally create the final graph. Is it possible to store the Graph
>> object as a file and retrieve it whenever i want to run an experiment?
>>
>> Thanks,
>> Stefanos
>
>
>
>


Re: store and retrieve Graph object

2015-11-25 Thread Vasiliki Kalavri
Hi Stefane,

let me know if I understand the problem correctly. The vertex values are
POJOs that you're somehow inferring from the edge list and this value
creation is what takes a lot of time? Since a graph is just a set of 2
datasets (vertices and edges), you could store the values to disk and have
a custom input format to read them into datasets. Would that work for you?

-Vasia.

On 25 November 2015 at 15:09, Stefanos Antaris 
wrote:

> Hi to all,
>
> i am working on a project with Gelly and i need to create a graph with
> billions of nodes. Although i have the edge list, the node in the Graph
> needs to be a POJO object, the construction of which takes long time in
> order to finally create the final graph. Is it possible to store the Graph
> object as a file and retrieve it whenever i want to run an experiment?
>
> Thanks,
> Stefanos


Re: LDBC Graph Data into Flink

2015-11-24 Thread Vasiliki Kalavri
Great, thanks for sharing Martin!

On 24 November 2015 at 15:00, Martin Junghanns 
wrote:

> Hi,
>
> I wrote a short blog post about the ldbc-flink tool including a short
> overview of Flink and a Gelly example.
>
> http://ldbcouncil.org/blog/ldbc-and-apache-flink
>
> Best,
> Martin
>
> On 06.10.2015 11:00, Martin Junghanns wrote:
> > Hi Vasia,
> >
> > No problem. Sure, Gelly is just a map() call away :)
> >
> > Best,
> > Martin
> >
> > On 06.10.2015 10:53, Vasiliki Kalavri wrote:
> >> Hi Martin,
> >>
> >> thanks a lot for sharing! This is a very useful tool.
> >> I only had a quick look, but if we merge label and payload inside a
> Tuple2,
> >> then it should also be Gelly-compatible :)
> >>
> >> Cheers,
> >> Vasia.
> >>
> >> On 6 October 2015 at 10:03, Martin Junghanns 
> >> wrote:
> >>
> >>> Hi all,
> >>>
> >>> For our benchmarks with Flink, we are using a data generator provided
> by
> >>> the LDBC project (Linked Data Benchmark Council) [1][2]. The generator
> uses
> >>> MapReduce to create directed, labeled, attributed graphs that mimic
> >>> properties of real online social networks (e.g, degree distribution,
> >>> diameter). The output is stored in several files either local or in
> HDFS.
> >>> Each file represents a vertex, edge or multi-valued property class.
> >>>
> >>> I wrote a little tool, that parses and transforms the LDBC output into
> two
> >>> datasets representing vertices and edges. Each vertex has a unique id,
> a
> >>> label and payload according to the LDBC schema. Each edge has a unique
> id,
> >>> a label, source and target vertex IDs and also payload according to the
> >>> schema.
> >>>
> >>> I thought this may be useful for others so I put it on GitHub [2]. It
> >>> currently uses Flink 0.10-SNAPSHOT as it depends on some fixes made in
> >>> there.
> >>>
> >>> Best,
> >>> Martin
> >>>
> >>> [1] http://ldbcouncil.org/
> >>> [2] https://github.com/ldbc/ldbc_snb_datagen
> >>> [3] https://github.com/s1ck/ldbc-flink-import
> >>>
> >>
>


Re: Creating a representative streaming workload

2015-11-16 Thread Vasiliki Kalavri
Hi,

thanks Nick and Ovidiu for the links!

Just to clarify, we're not looking into creating a generic streaming
benchmark. We have quite limited time and resources for this project. What
we want is to decide on a set of 3-4 _common_ streaming applications. To
give you an idea, for the batch workload, we will pick something like a
grep, one relational application, a graph algorithm, and an ML algorithm.

Cheers,
-Vasia.

On 16 November 2015 at 19:25, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Regarding Flink vs Spark / Storm you can check here:
> http://www.sparkbigdata.com/102-spark-blog-slim-baltagi/14-results-of-a-benchmark-between-apache-flink-and-apache-spark
>
> Best regards,
> Ovidiu
>
> On 16 Nov 2015, at 15:21, Vasiliki Kalavri 
> wrote:
>
> Hello squirrels,
>
> with some colleagues and students here at KTH, we have started 2 projects
> to evaluate (1) performance and (2) behavior in the presence of memory
> interference in cloud environments, for Flink and other systems. We want to
> provide our students with a workload of representative applications for
> testing.
>
> While for batch applications, it is quite clear to us what classes of
> applications are widely used and how to create a workload of different
> types of applications, we are not quite sure about the streaming workload.
>
> That's why, we'd like your opinions! If you're using Flink streaming in
> your company or your project, we'd love your input even more :-)
>
> What kind of applications would you consider as "representative" of a
> streaming workload? Have you run any experiments to evaluate Flink versus
> Spark, Storm etc.? If yes, would you mind sharing your code with us?
>
> We will of course be happy to share our results with everyone after we
> have completed our study.
>
> Thanks a lot!
> -Vasia.
>
>
>


Creating a representative streaming workload

2015-11-16 Thread Vasiliki Kalavri
Hello squirrels,

with some colleagues and students here at KTH, we have started 2 projects
to evaluate (1) performance and (2) behavior in the presence of memory
interference in cloud environments, for Flink and other systems. We want to
provide our students with a workload of representative applications for
testing.

While for batch applications, it is quite clear to us what classes of
applications are widely used and how to create a workload of different
types of applications, we are not quite sure about the streaming workload.

That's why, we'd like your opinions! If you're using Flink streaming in
your company or your project, we'd love your input even more :-)

What kind of applications would you consider as "representative" of a
streaming workload? Have you run any experiments to evaluate Flink versus
Spark, Storm etc.? If yes, would you mind sharing your code with us?

We will of course be happy to share our results with everyone after we have
completed our study.

Thanks a lot!
-Vasia.


Re: Zeppelin Integration

2015-11-04 Thread Vasiliki Kalavri
Great tutorial! Thanks a lot ^^

On 4 November 2015 at 17:12, Leonard Wolters  wrote:

> Indeed very nice! Thanks
> On Nov 4, 2015 5:04 PM, "Till Rohrmann"  wrote:
>
>> Really cool tutorial Trevor :-)
>>
>> On Wed, Nov 4, 2015 at 3:26 PM, Robert Metzger 
>> wrote:
>>
>>> For those interested, Trevor wrote a blog post describing how to setup
>>> Spark, Flink and Zeppelin, both locally and on clusters:
>>> http://trevorgrant.org/2015/11/03/apache-casserole-a-delicious-big-data-recipe-for-the-whole-family/
>>> Thanks Trevor for the great tutorial!
>>>
>>> On Thu, Oct 22, 2015 at 4:23 PM, Till Rohrmann 
>>> wrote:
>>>
 Hi Trevor,

 that’s actually my bad since I only tested my branch against a remote
 cluster. I fixed the problem (not properly starting the
 LocalFlinkMiniCluster) so that you can now use Zeppelin also in local
 mode. Just check out my branch again.

 Cheers,
 Till
 ​

 On Wed, Oct 21, 2015 at 10:00 PM, Trevor Grant <
 trevor.d.gr...@gmail.com> wrote:

> Hey Till,
>
> I cloned your branch of Zeplin and while it will compile, it fails
> tests on timeout, which consequently was the same issue I was having when
> trying to use Zeppelin.
>
> Ideas?
>
>
> ---
> Test set: org.apache.zeppelin.flink.FlinkInterpreterTest
>
> ---
> Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed:
> 100.347 sec <<< FAILURE! - in 
> org.apache.zeppelin.flink.FlinkInterpreterTest
> org.apache.zeppelin.flink.FlinkInterpreterTest  Time elapsed: 100.347
> sec  <<< ERROR!
> java.util.concurrent.TimeoutException: Futures timed out after [10
> milliseconds]
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.getLeaderIndex(FlinkMiniCluster.scala:171)
> at
> org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.getLeaderRPCPort(LocalFlinkMiniCluster.scala:132)
> at
> org.apache.zeppelin.flink.FlinkInterpreter.getPort(FlinkInterpreter.java:136)
> at
> org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:98)
> at
> org.apache.zeppelin.flink.FlinkInterpreterTest.setUp(FlinkInterpreterTest.java:42)
>
> org.apache.zeppelin.flink.FlinkInterpreterTest  Time elapsed: 100.347
> sec  <<< ERROR!
> java.lang.NullPointerException: null
> at
> org.apache.zeppelin.flink.FlinkInterpreter.close(FlinkInterpreter.java:221)
> at
> org.apache.zeppelin.flink.FlinkInterpreterTest.tearDown(FlinkInterpreterTest.java:48)
>
>
>
> Trevor Grant
> Data Scientist
> https://github.com/rawkintrevo
> http://stackexchange.com/users/3002022/rawkintrevo
>
> *"Fortunate is he, who is able to know the causes of things."  -Virgil*
>
>
> On Wed, Oct 21, 2015 at 11:57 AM, Till Rohrmann 
> wrote:
>
>> Hi Trevor,
>>
>> in order to use Zeppelin with a different Flink version in local
>> mode, meaning that Zeppelin starts a LocalFlinkMiniCluster when
>> executing your jobs, you have to build Zeppelin and change the
>> flink.version property in the zeppelin/flink/pom.xml file to the
>> version you want to use.
>>
>> If you want to let Zeppelin submit jobs to a remote cluster, you
>> should build Zeppelin with the version of your cluster. That’s because
>> internally Zeppelin will use this version to construct a JobGraph
>> which is then submitted to the cluster. In order to configure the remote
>> cluster, you have to go the *Interpreter* page and scroll down to
>> the *flink* section. There you have to specify the address of your
>> cluster under *host* and the port under *port*. This should then be
>> used to submit jobs to the Flink cluster.
>>
>> I hope this answers your question.
>>
>> Btw: If you want to use Zeppelin with the latest Flink 0.10-SNAPSHOT
>> version, you should checkout my branch
>> https://github.com/tillrohrmann/incubator-zeppelin/tree/flink-0.10-SNAPSHOT
>> where I’ve made the necessary changes.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Wed, Oct 21, 2015 at 5:00 PM, Trevor Grant <
>> trevor.d.gr...@gmail.com> wrote:
>>
>>> I'm setting up some Flink/Spark/Zeppelin at work.  Spark+Zeppelin
>>> seems to be relatively well supported and configurable but the Flink is 
>>> not
>

Re: Creating Graphs from DataStream in Flink Gelly

2015-11-02 Thread Vasiliki Kalavri
Hi Ufuoma,

Gelly doesn't support dynamic streaming graphs yet.
The project Andra has linked to is a prototype for *one-pass* streaming
graph analytics, i.e. no graph state is maintained.

If you would like to keep and maintain the graph state in your streaming
program, you would have to implement it yourself.
Take a look at the relevant section of the programming guide [1] and please
let us know if you have questions!

Cheers,
-Vasia.

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#working-with-state

On 2 November 2015 at 14:39, Andra Lungu  wrote:

> Hi,
>
> There is a separate project related to graph streaming. It's called
> gelly-streaming.
> And, if you look here:
>
> https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/GraphStream.java
>
> You can find a constructor which creates a graph from a DataStream of
> edges.
>
> Gelly itself cannot do that, for now.
>
> Hope this helps!
> Andra
>
> On Mon, Nov 2, 2015 at 3:35 PM, UFUOMA IGHOROJE 
> wrote:
>
>> Is there some support for creating Graphs from DataStream in Flink Gelly?
>> My use case is to create dynamic graphs, where the graph topology can be
>> updated from a stream of data. From the API, all I can find is creating
>> graphs from DataSets.
>>
>> Best,
>>
>> Ufuoma
>>
>
>


Re: compile flink-gelly-scala using sbt

2015-10-28 Thread Vasiliki Kalavri
Are you using 0.10-SNAPSHOT for this? Because in 0.9.1 this method
(getJavaEnv()) indeed doesn't exist ;)

On 28 October 2015 at 16:51, Le Quoc Do  wrote:

> From Graph.scala:
>
> *  def fromDataSet[K: TypeInformation : ClassTag, EV: TypeInformation :
> ClassTag]*
> *  (edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K,
> NullValue, EV] = {*
> *wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet, env.getJavaEnv))*
> *  }*
>
>
> ==
> Le Quoc Do
> Dresden University of Technology
> Faculty of Computer Science
> Institute for System Architecture
> Systems Engineering Group
> 01062 Dresden
> E-Mail: d...@se.inf.tu-dresden.de
>
> On Wed, Oct 28, 2015 at 4:41 PM, Theodore Vasiloudis <
> theodoros.vasilou...@gmail.com> wrote:
>
>> Could you share a minimal code example where this happens?
>>
>> On Wed, Oct 28, 2015 at 4:22 PM, Le Quoc Do  wrote:
>>
>>> Hi Theodore and Vasia,
>>>
>>> Thanks for your reply.
>>>
>>> I can compile my code by add dependency jars manually.
>>>
>>> Yes, in my code, I already import Flink-scala (import 
>>> org.apache.flink.api.scala._).
>>> However when I run my code,
>>> I get the following error:
>>>
>>> *ava.lang.NoSuchMethodError:
>>> org.apache.flink.api.scala.ExecutionEnvironment.getJavaEnv()Lorg/apache/flink/api/java/ExecutionEnvironment;
>>> at org.apache.flink.graph.scala.Graph$.fromDataSet(Graph.scala:53)*
>>>
>>> any suggestions?
>>>
>>> Thanks,
>>> Do
>>>
>>> ==
>>> Le Quoc Do
>>> Dresden University of Technology
>>> Faculty of Computer Science
>>> Institute for System Architecture
>>> Systems Engineering Group
>>> 01062 Dresden
>>> E-Mail: d...@se.inf.tu-dresden.de
>>>
>>> On Wed, Oct 28, 2015 at 3:50 PM, Theodore Vasiloudis <
>>> theodoros.vasilou...@gmail.com> wrote:
>>>
>>>> Your build.sbt seems correct.
>>>> It might be that you are missing some basic imports.
>>>>
>>>> In your code have you imported
>>>>
>>>> import org.apache.flink.api.scala._
>>>>
>>>> ?
>>>>
>>>>
>>>> On Tue, Oct 27, 2015 at 8:45 PM, Vasiliki Kalavri <
>>>> vasilikikala...@gmail.com> wrote:
>>>>
>>>>> Hi Do,
>>>>>
>>>>> I don't really have experience with sbt, but one thing that might
>>>>> cause problems is that your dependencies point to Flink 0.9.1 and
>>>>> gelly-scala wasn't part of that release. You can either try to use the
>>>>> 0.10-SNAPSHOT or wait a few days for the 0.10 release.
>>>>>
>>>>> Cheers,
>>>>> -Vasia.
>>>>>
>>>>> On 27 October 2015 at 18:05, Le Quoc Do  wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I try to compile flink-gelly-scala using sbt. However, I got the
>>>>>> following error:
>>>>>>
>>>>>> *error]
>>>>>> /home/ubuntu/git/flink-learning/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala:42:
>>>>>> value getJavaEnv is not a member of
>>>>>> org.apache.flink.api.scala.ExecutionEnvironment*
>>>>>> *[error] wrapGraph(jg.Graph.fromDataSet[K, VV,
>>>>>> EV](vertices.javaSet, edges.javaSet, env.getJavaEnv))*
>>>>>> *[error]
>>>>>>^*
>>>>>> *[error]
>>>>>> /home/ubuntu/git/flink-learning/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala:51:
>>>>>> value getJavaEnv is not a member of
>>>>>> org.apache.flink.api.scala.ExecutionEnvironment*
>>>>>> *[error] wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet,
>>>>>> env.getJavaEnv))*
>>>>>>
>>>>>> The content of built.sbt file:
>>>>>>
>>>>>> *name := "flink-graph-metrics"*
>>>>>>
>>>>>> *version := "1.0"*
>>>>>>
>>>>>> *scalaVersion := "2.11.6"*
>>>>>>
>>>>>> *libraryDependencies ++= Seq("org.apache.flink" % "flink-scala" %
>>>>>> "0.9.1", "org.apache.flink" % "flink-clients" % "0.9.1", 
>>>>>> "org.apache.flink"
>>>>>> % "flink-gelly"  % "0.9.1")*
>>>>>>
>>>>>> *fork in run := true*
>>>>>>
>>>>>>
>>>>>> Do you know how to fix this problem?
>>>>>>
>>>>>> Thanks,
>>>>>> Do
>>>>>>
>>>>>> ==
>>>>>> Le Quoc Do
>>>>>> Dresden University of Technology
>>>>>> Faculty of Computer Science
>>>>>> Institute for System Architecture
>>>>>> Systems Engineering Group
>>>>>> 01062 Dresden
>>>>>> E-Mail: d...@se.inf.tu-dresden.de
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: compile flink-gelly-scala using sbt

2015-10-27 Thread Vasiliki Kalavri
Hi Do,

I don't really have experience with sbt, but one thing that might cause
problems is that your dependencies point to Flink 0.9.1 and gelly-scala
wasn't part of that release. You can either try to use the 0.10-SNAPSHOT or
wait a few days for the 0.10 release.

Cheers,
-Vasia.

On 27 October 2015 at 18:05, Le Quoc Do  wrote:

> Hi,
>
> I try to compile flink-gelly-scala using sbt. However, I got the following
> error:
>
> *error]
> /home/ubuntu/git/flink-learning/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala:42:
> value getJavaEnv is not a member of
> org.apache.flink.api.scala.ExecutionEnvironment*
> *[error] wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet,
> edges.javaSet, env.getJavaEnv))*
> *[error]
>  ^*
> *[error]
> /home/ubuntu/git/flink-learning/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala:51:
> value getJavaEnv is not a member of
> org.apache.flink.api.scala.ExecutionEnvironment*
> *[error] wrapGraph(jg.Graph.fromDataSet[K, EV](edges.javaSet,
> env.getJavaEnv))*
>
> The content of built.sbt file:
>
> *name := "flink-graph-metrics"*
>
> *version := "1.0"*
>
> *scalaVersion := "2.11.6"*
>
> *libraryDependencies ++= Seq("org.apache.flink" % "flink-scala" % "0.9.1",
> "org.apache.flink" % "flink-clients" % "0.9.1", "org.apache.flink" %
> "flink-gelly"  % "0.9.1")*
>
> *fork in run := true*
>
>
> Do you know how to fix this problem?
>
> Thanks,
> Do
>
> ==
> Le Quoc Do
> Dresden University of Technology
> Faculty of Computer Science
> Institute for System Architecture
> Systems Engineering Group
> 01062 Dresden
> E-Mail: d...@se.inf.tu-dresden.de
>


Re: LDBC Graph Data into Flink

2015-10-06 Thread Vasiliki Kalavri
Hi Martin,

thanks a lot for sharing! This is a very useful tool.
I only had a quick look, but if we merge label and payload inside a Tuple2,
then it should also be Gelly-compatible :)

Cheers,
Vasia.

On 6 October 2015 at 10:03, Martin Junghanns 
wrote:

> Hi all,
>
> For our benchmarks with Flink, we are using a data generator provided by
> the LDBC project (Linked Data Benchmark Council) [1][2]. The generator uses
> MapReduce to create directed, labeled, attributed graphs that mimic
> properties of real online social networks (e.g, degree distribution,
> diameter). The output is stored in several files either local or in HDFS.
> Each file represents a vertex, edge or multi-valued property class.
>
> I wrote a little tool, that parses and transforms the LDBC output into two
> datasets representing vertices and edges. Each vertex has a unique id, a
> label and payload according to the LDBC schema. Each edge has a unique id,
> a label, source and target vertex IDs and also payload according to the
> schema.
>
> I thought this may be useful for others so I put it on GitHub [2]. It
> currently uses Flink 0.10-SNAPSHOT as it depends on some fixes made in
> there.
>
> Best,
> Martin
>
> [1] http://ldbcouncil.org/
> [2] https://github.com/ldbc/ldbc_snb_datagen
> [3] https://github.com/s1ck/ldbc-flink-import
>


Re: Too few memory segments provided exception

2015-07-20 Thread Vasiliki Kalavri
Hi Shivani,

the Jaccard example is implemented in Giraph, and therefore uses iterations.
However, in Gelly we are not forced to do that for non-iterative
computations.

I see that there is some confusion with the implementation specifics.
Let me try to write down some skeleton code / detailed description on how
to do this properly in Gelly and let's move this discussion to the
corresponding issue.

Cheers,
-Vasia.

On 20 July 2015 at 16:45, Shivani Ghatge  wrote:

> Also the example of Jaccard that you had linked me to used VertexCentric
> configuration which I understand is because that api only uses
> VertexCentricIteration for all the operations? But I think that is the best
> way in order to know what neighbors belong to the BloomFilter?
>
> On Mon, Jul 20, 2015 at 3:43 PM, Shivani Ghatge 
> wrote:
>
>> Hello Vasia,
>>
>> As I had mentioned before, I need a BloomFilter as well as a HashSet for
>> the approximation to work. In the exact solution I am getting two HashSets
>> and comparing them. In approximate version, if we get two BloomFilters then
>> we have no way to compare the neighborhood sets.
>>
>> I thought we agreed that the BloomFilters are to be sent as messages to
>> the vertices?
>>
>> The exact version is passing all the tests.
>>
>> On removing the final GroupReduce the program is working but I need it to
>> add the Partial Adamic Adar edges weights.
>>
>> On Mon, Jul 20, 2015 at 3:15 PM, Vasiliki Kalavri <
>> vasilikikala...@gmail.com> wrote:
>>
>>> Hi Shivani,
>>>
>>> why are you using a vertex-centric iteration to compute the approximate
>>> Adamic-Adar?
>>> It's not an iterative computation :)
>>>
>>> In fact, it should be as complex (in terms of operators) as the exact
>>> Adamic-Adar, only more efficient because of the different neighborhood
>>> representation. Are you having the same problem with the exact computation?
>>>
>>> Cheers,
>>> Vasia.
>>>
>>> On 20 July 2015 at 14:41, Maximilian Michels  wrote:
>>>
>>>> Hi Shivani,
>>>>
>>>> The issue is that by the time the Hash Join is executed, the
>>>> MutableHashTable cannot allocate enough memory segments. That means that
>>>> your other operators are occupying them. It is fine that this also occurs
>>>> on Travis because the workers there have limited memory as well.
>>>>
>>>> Till suggested to change the memory fraction through the
>>>> ExuectionEnvironment. Can you try that?
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge 
>>>> wrote:
>>>>
>>>>> Hello Maximilian,
>>>>>
>>>>> Thanks for the suggestion. I will use it to check the program. But
>>>>> when I am creating a PR for the same implementation with a Test, I am
>>>>> getting the same error even on Travis build. So for that what would be the
>>>>> solution?
>>>>>
>>>>> Here is my PR https://github.com/apache/flink/pull/923
>>>>> And here is the Travis build status
>>>>> https://travis-ci.org/apache/flink/builds/71695078
>>>>>
>>>>> Also on the IDE it is working fine in Collection execution mode.
>>>>>
>>>>> Thanks and Regards,
>>>>> Shivani
>>>>>
>>>>> On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels 
>>>>> wrote:
>>>>>
>>>>>> Hi Shivani,
>>>>>>
>>>>>> Flink doesn't have enough memory to perform a hash join. You need to
>>>>>> provide Flink with more memory. You can either increase the
>>>>>> "taskmanager.heap.mb" config variable or set 
>>>>>> "taskmanager.memory.fraction"
>>>>>> to some value greater than 0.7 and smaller then 1.0. The first config
>>>>>> variable allocates more overall memory for Flink; the latter changes the
>>>>>> ratio between Flink managed memory (e.g. for hash join) and user memory
>>>>>> (for you functions and Gelly's code).
>>>>>>
>>>>>> If you run this inside an IDE, the memory is configured automatically
>>>>>> and you don't have control over that at the moment. You could, however,
>>>>>> start a local cluster (./bin/start-local) after you adjusted your
>>>>

Re: Too few memory segments provided exception

2015-07-20 Thread Vasiliki Kalavri
I believe there was some work in progress to reduce memory fragmentation
and solve similar problems.
Anyone knows what's happening with that?

On 20 July 2015 at 16:29, Andra Lungu  wrote:

> I also questioned the vertex-centric approach before. The exact
> computation does not throw this exception so I guess adapting the
> approximate version will do the trick [I also suggested improving the
> algorithm to use less operators offline].
>
> However, the issue still persists. We saw it in Affinity Propagation as
> well... So even if the problem will disappear for this example, I am
> curious how we should handle it in the future.
>
> On Mon, Jul 20, 2015 at 3:15 PM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Hi Shivani,
>>
>> why are you using a vertex-centric iteration to compute the approximate
>> Adamic-Adar?
>> It's not an iterative computation :)
>>
>> In fact, it should be as complex (in terms of operators) as the exact
>> Adamic-Adar, only more efficient because of the different neighborhood
>> representation. Are you having the same problem with the exact computation?
>>
>> Cheers,
>> Vasia.
>>
>> On 20 July 2015 at 14:41, Maximilian Michels  wrote:
>>
>>> Hi Shivani,
>>>
>>> The issue is that by the time the Hash Join is executed, the
>>> MutableHashTable cannot allocate enough memory segments. That means that
>>> your other operators are occupying them. It is fine that this also occurs
>>> on Travis because the workers there have limited memory as well.
>>>
>>> Till suggested to change the memory fraction through the
>>> ExuectionEnvironment. Can you try that?
>>>
>>> Cheers,
>>> Max
>>>
>>> On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge 
>>> wrote:
>>>
>>>> Hello Maximilian,
>>>>
>>>> Thanks for the suggestion. I will use it to check the program. But when
>>>> I am creating a PR for the same implementation with a Test, I am getting
>>>> the same error even on Travis build. So for that what would be the
>>>> solution?
>>>>
>>>> Here is my PR https://github.com/apache/flink/pull/923
>>>> And here is the Travis build status
>>>> https://travis-ci.org/apache/flink/builds/71695078
>>>>
>>>> Also on the IDE it is working fine in Collection execution mode.
>>>>
>>>> Thanks and Regards,
>>>> Shivani
>>>>
>>>> On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels 
>>>> wrote:
>>>>
>>>>> Hi Shivani,
>>>>>
>>>>> Flink doesn't have enough memory to perform a hash join. You need to
>>>>> provide Flink with more memory. You can either increase the
>>>>> "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction"
>>>>> to some value greater than 0.7 and smaller then 1.0. The first config
>>>>> variable allocates more overall memory for Flink; the latter changes the
>>>>> ratio between Flink managed memory (e.g. for hash join) and user memory
>>>>> (for you functions and Gelly's code).
>>>>>
>>>>> If you run this inside an IDE, the memory is configured automatically
>>>>> and you don't have control over that at the moment. You could, however,
>>>>> start a local cluster (./bin/start-local) after you adjusted your
>>>>> flink-conf.yaml and run your programs against that configured cluster. You
>>>>> can do that either through your IDE using a RemoteEnvironment or by
>>>>> submitting the packaged JAR to the local cluster using the command-line
>>>>> tool (./bin/flink).
>>>>>
>>>>> Hope that helps.
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge 
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>  I am working on a problem which implements Adamic Adar Algorithm
>>>>>> using Gelly.
>>>>>> I am running into this exception for all the Joins (including the one
>>>>>> that are part of the reduceOnNeighbors function)
>>>>>>
>>>>>> Too few memory segments provided. Hash Join needs at least 33 memory
>>>>>> segments.
>>>>>>
>>>>>>
>>>>>>

Re: Too few memory segments provided exception

2015-07-20 Thread Vasiliki Kalavri
Hi Shivani,

why are you using a vertex-centric iteration to compute the approximate
Adamic-Adar?
It's not an iterative computation :)

In fact, it should be as complex (in terms of operators) as the exact
Adamic-Adar, only more efficient because of the different neighborhood
representation. Are you having the same problem with the exact computation?

Cheers,
Vasia.

On 20 July 2015 at 14:41, Maximilian Michels  wrote:

> Hi Shivani,
>
> The issue is that by the time the Hash Join is executed, the
> MutableHashTable cannot allocate enough memory segments. That means that
> your other operators are occupying them. It is fine that this also occurs
> on Travis because the workers there have limited memory as well.
>
> Till suggested to change the memory fraction through the
> ExuectionEnvironment. Can you try that?
>
> Cheers,
> Max
>
> On Mon, Jul 20, 2015 at 2:23 PM, Shivani Ghatge 
> wrote:
>
>> Hello Maximilian,
>>
>> Thanks for the suggestion. I will use it to check the program. But when I
>> am creating a PR for the same implementation with a Test, I am getting the
>> same error even on Travis build. So for that what would be the solution?
>>
>> Here is my PR https://github.com/apache/flink/pull/923
>> And here is the Travis build status
>> https://travis-ci.org/apache/flink/builds/71695078
>>
>> Also on the IDE it is working fine in Collection execution mode.
>>
>> Thanks and Regards,
>> Shivani
>>
>> On Mon, Jul 20, 2015 at 2:14 PM, Maximilian Michels 
>> wrote:
>>
>>> Hi Shivani,
>>>
>>> Flink doesn't have enough memory to perform a hash join. You need to
>>> provide Flink with more memory. You can either increase the
>>> "taskmanager.heap.mb" config variable or set "taskmanager.memory.fraction"
>>> to some value greater than 0.7 and smaller then 1.0. The first config
>>> variable allocates more overall memory for Flink; the latter changes the
>>> ratio between Flink managed memory (e.g. for hash join) and user memory
>>> (for you functions and Gelly's code).
>>>
>>> If you run this inside an IDE, the memory is configured automatically
>>> and you don't have control over that at the moment. You could, however,
>>> start a local cluster (./bin/start-local) after you adjusted your
>>> flink-conf.yaml and run your programs against that configured cluster. You
>>> can do that either through your IDE using a RemoteEnvironment or by
>>> submitting the packaged JAR to the local cluster using the command-line
>>> tool (./bin/flink).
>>>
>>> Hope that helps.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Mon, Jul 20, 2015 at 2:04 PM, Shivani Ghatge 
>>> wrote:
>>>
 Hello,
  I am working on a problem which implements Adamic Adar Algorithm using
 Gelly.
 I am running into this exception for all the Joins (including the one
 that are part of the reduceOnNeighbors function)

 Too few memory segments provided. Hash Join needs at least 33 memory
 segments.


 The problem persists even when I comment out some of the joins.

 Even after using edg = edg.join(graph.getEdges(),
 JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).equalTo(0,1).with(new
 JoinEdge());

 as suggested by @AndraLungu the problem persists.

 The code is


 DataSet> degrees = graph.getDegrees();

 //get neighbors of each vertex in the HashSet for it's value
 computedNeighbors = graph.reduceOnNeighbors(new
 GatherNeighbors(), EdgeDirection.ALL);

 //get vertices with updated values for the final Graph which
 will be used to get Adamic Edges
 Vertices = computedNeighbors.join(degrees,
 JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new
 JoinNeighborDegrees());

 Graph, List>>> Long, Double>>>, Double> updatedGraph =
 Graph.fromDataSet(Vertices, edges, env);

 //configure Vertex Centric Iteration
 VertexCentricConfiguration parameters = new
 VertexCentricConfiguration();

 parameters.setName("Find Adamic Adar Edge Weights");

 parameters.setDirection(EdgeDirection.ALL);

 //run Vertex Centric Iteration to get the Adamic Adar Edges
 into the vertex Value
 updatedGraph = updatedGraph.runVertexCentricIteration(new
 GetAdamicAdarEdges(), new NeighborsMessenger(), 1, parameters);

 //Extract Vertices of the updated graph
 DataSet,
 List vertices = updatedGraph.getVertices();

 //Extract the list of Edges from the vertex values
 DataSet> edg = vertices.flatMap(new
 GetAdamicList());

 //Partial weights for the edges are added
 edg = edg.groupBy(0,1).reduce(new AdamGroup());

 //Graph is updated with the Adamic Adar Edges
 edg = edg.join(graph.getEdges(),
 JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND).where(0,1).e

Re: Containment Join Support

2015-07-18 Thread Vasiliki Kalavri
Hi Martin,

I'm really glad to see that you've started using Gelly :)

I think that a graph summarization library method would be a great
addition!
Let me know if you need help and if you want to discuss ideas or other
methods.

Cheers,
Vasia.

On 17 July 2015 at 12:25, Martin Junghanns  wrote:

>  Hi Fabian, hi Stephen,
>
> thanks for answering my question. Good hint with the list replication, I
> will benchmark this vs. cross + filter.
>
> Best, Martin
>
>
> Am 17.07.2015 um 11:17 schrieb Stephan Ewen:
>
> I would rewrite this to replicate the list into tuples:
>
>  "foreach x in list: emit (x, list)"
> Then join on fields 0.
>
>  This replicates the lists, but makes the join very efficient.
>
> On Fri, Jul 17, 2015 at 12:26 AM, Fabian Hueske  wrote:
>
>>Hi Martin,
>>
>>  good to hear that you like Flink :-)
>>  AFAIK, there are no plans to add a containment join. The Flink community
>> is currently working on adding support for outer joins.
>>  Regarding a containment join, I am not sure about the number of use
>> cases. I would rather try to implement it on top of Flink's batch API
>> instead of adding it as an internal feature/operator to the system because
>> this would touch a lot of things (API, optimizer, operator implementation).
>>
>>  There might be better ways to implement a containment join than using a
>> cross and a filter.
>> - Do you know a distributed algorithm for containment joins? Maybe it can
>> be implemented with Flink's API.
>> - I guess, you are implementing a generic graph framework, but can you
>> make certain assumptions about the data such as relative sizes of the
>> inputs or avg/max size of the lists, etc.?
>>
>>  Contributions to Gelly (and Flink in general) are highly welcome.
>>
>>  Best, Fabian
>>
>>
>> 2015-07-16 9:39 GMT+02:00 Martin Junghanns :
>>
>>> Hi everyone,
>>>
>>> at first, thanks for building this great framework! We are using Flink
>>>  and especially Gelly for building a graph analytics stack (gradoop.com
>>> ).
>>>
>>> I was wondering if there is a [planned] support for a containment join
>>> operator. Consider the following example:
>>>
>>> DataSet> left := {[0, 1], [2, 3, 4], [5]}
>>> DataSet> right := {<0, 1>, <1, 0>, <2, 1>, <5, 2>}
>>>
>>> What I want to compute is
>>>
>>> left.join(right).where(list).contains(tuple.f0) :=
>>>
>>> {
>>> <[0, 1], <0,1>>, <[0, 1], <1, 0>>,
>>> <[2, 3, 4], <2, 1>>,
>>> <[5], <5, 2>
>>> }
>>>
>>> At the moment, I am solving that using cross and filter, which can be
>>> expensive.
>>>
>>> The generalization of that operator would be "set containment join",
>>> where you join if the right set is contained in the left set.
>>>
>>> If there is a general need for that operator, I would also like to
>>> contribute to its implementation.
>>>
>>> But maybe, there is already another nice solution which I didn't
>>> discover yet?
>>>
>>> Any help would be appreciated. Especially since I would also like to
>>> contribute some of our graph operators (e.g., graph summarization) back
>>> to Flink/Gelly (current WIP state can be found here: [1]).
>>>
>>> Thanks,
>>>
>>> Martin
>>>
>>>
>>> [1]
>>>
>>> https://github.com/dbs-leipzig/gradoop/blob/%2345_gradoop_flink/gradoop-flink/src/main/java/org/gradoop/model/impl/operators/Summarization.java
>>>
>>>
>>>
>>
>
>


Re: Gelly EOFException

2015-07-18 Thread Vasiliki Kalavri
Hi Flavio,

Gelly currently makes no sanity checks regarding the input graph data.
We decided to leave it to the user to check that they have a valid graph,
for performance reasons.
That means that there might exist Gelly methods that assume that your input
graph is valid, i.e. no duplicate vertices exist, no invalid edge ids, etc.
I can't tell whether this particular error is caused because of the
duplicate vertices, but it might be.

Cheers,
Vasia.

On 16 July 2015 at 20:43, Flavio Pompermaier  wrote:

> but isn't something that cause problem to have multiple vertices with the
> same id?
> On 16 Jul 2015 19:34, "Andra Lungu"  wrote:
>
>> For now, there is a validator that checks whether the vertex ids
>> correspond to the target/src ids in the edges. If you want to check for
>> vertex ID uniqueness, you'll have to implement your own custom validator...
>> I know people with the same error outside Gelly, so I doubt that the lack
>> of unique ids triggered the exception :)
>>
>> On Thu, Jul 16, 2015 at 6:14 PM, Flavio Pompermaier > > wrote:
>>
>>> I thought a bit about this error..in my job I was generating multiple
>>> vertices with the same id.
>>> Could this cause such errors? Maybe there could be a check about such
>>> situations in Gelly..
>>>
>>> On Tue, Jul 14, 2015 at 10:00 PM, Andra Lungu 
>>> wrote:
>>>
 Hello,

 Sorry for the delay. The bug is not in Gelly, but is, as hinted in the
 exception and as can be seen in the logs, in Flink's Runtime. Mihail may
 actually be on to something. The bug is actually very similar to the one
 described in FLINK-1916.

 However, as can be seen in the discussion thread there, it's a bit
 difficult to fix it without some steps to reproduce. I unfortunately
 managed to reproduce it and have opened a Jira... FLINK-2360
 . It's a similar
 delta iteration setting. Hope we can get some help with this.

 Thanks!
 Andra

 On Tue, Jul 14, 2015 at 2:12 PM, Mihail Vieru <
 vi...@informatik.hu-berlin.de> wrote:

>  Hi,
>
> looks very similar to this bug:
> https://issues.apache.org/jira/browse/FLINK-1916
>
> Best,
> Mihail
>
>
> On 14.07.2015 14:09, Andra Lungu wrote:
>
> Hi Flavio,
>
> Could you also show us a code snippet?
>
> On Tue, Jul 14, 2015 at 2:06 PM, Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> Hi to all,
>> in my vertex centric iteration I get the following exception, am I
>> doing something wrong or is it a bug of Gelly?
>>
>>  starting iteration [1]:  CoGroup (Messaging) (6/8)
>> IterationHead(WorksetIteration (Vertex-centric iteration
>> (test.gelly.functions.VUpdateFunction@1814786f
>> | test.gelly.functions.VMessagingFunction@67eecbc2))) (4/8) switched
>> to FAILED with exception.
>> java.io.EOFException
>>  at
>> org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333)
>>  at
>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>  at
>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>>  at
>> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>>  at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>  at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:187)
>>  at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:372)
>>  at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116)
>>  at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>>  at
>> org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219)
>>  at
>> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536)
>>  at
>> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347)
>>  at
>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209)
>>  at
>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270)
>>  at
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>  at java.lang.Thread.run(Thread.java:745)
>>
>>
>>  Best,
>> Flavio
>>
>
>
>

>>>
>>>
>>


Re: Gelly forward

2015-07-08 Thread Vasiliki Kalavri
Thanks Flavio.

Here's my suggestion. If your paths are short and don't vary much, it might
make sense to do this using the DataSet API. The iteration will probably be
too much overhead.
For example, if you want to find the paths for "livesIn.containedIn.name",
then you can create 3 DataSets, one for each path hop, by simply filtering
by edgeName. Then, you can easily create the final "messages", joining the
first 2 DataSets (get the "livesIn.containedIn" path) and then joining the
output with the third.

If your paths are long and you think that the above solution doesn't work,
then you could use iterations.
First, I think you will need to have a way to check whether an attribute
exists in a path and in which position. Using the same example, if you have
the path "livesIn.containedIn.name", then you need a method that would get
a path hop, e.g. "containedIn" and return the hop number, e.g. 1 for
"livesIn", 2 for "containedIn", 3 for "name" (maybe -1 if it's not there).
Once you have this, then you can use it to check whether to send a message
over an edge or not.
Your Messaging function could look like this:

for (edge: edges) {
  if (getPathHop(edge.getValue()) == getSuperstep) { // forward the msg
sendMessageTo(edge.getTarget(), msg);
  }
}

Like this, in superstep 1 you will send a msg over edges labeled as
"livesIn", in superstep 2 you will send a message over the edges labeled as
"containedIn" and in superstep 3 you will find the ones with "name".

Would this work for you?

Cheers,
Vasia.


On 8 July 2015 at 14:52, Flavio Pompermaier  wrote:

> In my Implementation every vertex has its own bags of "knowledge" (i.e. it
> knows all tuples belonging to it..).
> So in Vertex 1 I have a field (an HashMap) containing the following info:
>
>- type=Person
>- livesIn=2 (and I know also that 2 is a vertexId)
>
> In Vertex 3 I know:
>
>- type=Place
>- name=Berlin
>- containedIn=4 (and I know also that 4 is a vertexId)
>
> and so on..
>
>
> On Wed, Jul 8, 2015 at 2:29 PM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> yes, I think it's possible. I have one question before I try to explain
>> how:
>> do you model "Rome", "Berlin", "101.3" etc. in your above example as
>> separate vertices or as properties of vertices?
>>
>> On 8 July 2015 at 10:43, Flavio Pompermaier  wrote:
>>
>>> Let's say I have some nodes of some type of interest (type1 and type2).
>>> My source data looks like .
>>> For example, I could be interested in sourceNodes having type == Person
>>> to gather the value obtained from the expansion of some paths (for
>>> example livesIn.name and marriedTo.name). Notice that I could define other
>>> paths of interest for different types.. (For nodes of type Place I could be
>>> interested in gathering containedIn.name).
>>> If my source data looks like:
>>>
>>> (1, Person, livesIn, 2)
>>> (1, Person, livesIn, 3)
>>> (2, Place, name, "Rome")
>>> (2, Place, lat, 101.3)
>>> (2, Place, long, 101.3)
>>> (3, Place, name, "Berlin")
>>> (3, Place, containedIn, 4)
>>> (4, Country, name, "Germany")
>>>
>>> I'd like that node 1 (in the end) collect the following paths:
>>>
>>> livesIn.name : "Rome" (from node 2)
>>> livesIn.name : "Berlin" (from node 3)
>>> livesIn.containedIn.name: "Germany" (from node 4)
>>> marriedTo.name : null because not married :)
>>>
>>> So, in my vertexCentricIteration each Vertex knows its neighbors (e.g.
>>> node 1 knows that 2 and 3 should be queried for their "name" attribute).
>>> If they receive a message asking for "name" from node 1 they have to
>>> reply to node 1 with the value of that property.
>>>
>>> So in my implementation, I check whether my node has to send some query
>>> to neighbors and wait for the response. The problem is that node 3 for
>>> example, once queried for property containedIn.name from node 1 it just
>>> have to forward this path to node 4 and thell to 4 to reply to 1.
>>>
>>> Is that possible?
>>>
>>>
>>> On Wed, Jul 8, 2015 at 10:19 AM, Vasiliki Kalavri <
>>> vasilikikala...@gmail.com> wrote:
>>>
>>>> Is it the same message that you propagate or is it different for each
>>>> vertex / neighbor? If you have to store a  pair for each
&g

Re: Gelly forward

2015-07-08 Thread Vasiliki Kalavri
Hi Flavio,

yes, I think it's possible. I have one question before I try to explain
how:
do you model "Rome", "Berlin", "101.3" etc. in your above example as
separate vertices or as properties of vertices?

On 8 July 2015 at 10:43, Flavio Pompermaier  wrote:

> Let's say I have some nodes of some type of interest (type1 and type2).
> My source data looks like .
> For example, I could be interested in sourceNodes having type == Person
> to gather the value obtained from the expansion of some paths (for
> example livesIn.name and marriedTo.name). Notice that I could define other
> paths of interest for different types.. (For nodes of type Place I could be
> interested in gathering containedIn.name).
> If my source data looks like:
>
> (1, Person, livesIn, 2)
> (1, Person, livesIn, 3)
> (2, Place, name, "Rome")
> (2, Place, lat, 101.3)
> (2, Place, long, 101.3)
> (3, Place, name, "Berlin")
> (3, Place, containedIn, 4)
> (4, Country, name, "Germany")
>
> I'd like that node 1 (in the end) collect the following paths:
>
> livesIn.name : "Rome" (from node 2)
> livesIn.name : "Berlin" (from node 3)
> livesIn.containedIn.name: "Germany" (from node 4)
> marriedTo.name : null because not married :)
>
> So, in my vertexCentricIteration each Vertex knows its neighbors (e.g.
> node 1 knows that 2 and 3 should be queried for their "name" attribute).
> If they receive a message asking for "name" from node 1 they have to reply
> to node 1 with the value of that property.
>
> So in my implementation, I check whether my node has to send some query to
> neighbors and wait for the response. The problem is that node 3 for
> example, once queried for property containedIn.name from node 1 it just
> have to forward this path to node 4 and thell to 4 to reply to 1.
>
> Is that possible?
>
>
> On Wed, Jul 8, 2015 at 10:19 AM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Is it the same message that you propagate or is it different for each
>> vertex / neighbor? If you have to store a  pair for each
>> neighbor, then you will quickly run out of memory. If it's the same message
>> you need to send, or you simply need to add the current vertex Id, then you
>> can probably get rid of the neighborID.
>>
>> By "outbox" I believe you mean storing them in the vertex value, correct?
>> I don't think you will have to explicitly reset it, as in each superstep
>> vertices only receive messages sent in the previous superstep, i.e. "old"
>> messages don't get re-sent.
>> On Jul 8, 2015 9:48 AM, "Flavio Pompermaier" 
>> wrote:
>>
>>> The problem is that my nodes have to gather data coming from some path
>>> of interest along the graph (depending on the type of the node), otherwise
>>> they just have to forward the received message adding their id to the
>>> message path (more or less). It's like a postal tracking system.
>>>
>>> The problem is that I have to reset the "outbox" of each vertex once the
>>> messages have been sent..
>>> Do you think that it makes sense in this case to have an outbox of
>>> messages (destination, message) in each vertex and reset it in the
>>> postSuperstep() of the VertexUpdateFunction?
>>>
>>> On Wed, Jul 8, 2015 at 9:38 AM, Vasiliki Kalavri <
>>> vasilikikala...@gmail.com> wrote:
>>>
>>>> Hi Flavio!
>>>>
>>>> Are you talking about vertex-centric iterations in gelly?
>>>>
>>>> If yes, you can send messages to a particular vertex with
>>>> "sendMessageTo(vertexId, msg)" and
>>>> to all neighbors with "sendMessageToAllNeighbors(msg)". These methods
>>>> are available inside the MessagingFunction.
>>>> Accessing received messages happens inside the VertexUpdateFunction.
>>>> So, the usual way of writing these programs is to:
>>>> (1) go through received messages in the VertexUpdateFunction and
>>>> compute the new vertex value based them
>>>> (2) compute and send messages in the MessagingFunction.
>>>>
>>>> Would that work in you case?
>>>>
>>>> Cheers,
>>>> Vasia.
>>>>
>>>> On 8 July 2015 at 08:47, Flavio Pompermaier 
>>>> wrote:
>>>>
>>>>>
>>>>> Hi to all,
>>>>> is there a way in gelly to forward received messages (and modify their
>>>>> content before sending)?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>>
>>>>
>>>
>


Re: Gelly forward

2015-07-08 Thread Vasiliki Kalavri
Is it the same message that you propagate or is it different for each
vertex / neighbor? If you have to store a  pair for each
neighbor, then you will quickly run out of memory. If it's the same message
you need to send, or you simply need to add the current vertex Id, then you
can probably get rid of the neighborID.

By "outbox" I believe you mean storing them in the vertex value, correct?
I don't think you will have to explicitly reset it, as in each superstep
vertices only receive messages sent in the previous superstep, i.e. "old"
messages don't get re-sent.
On Jul 8, 2015 9:48 AM, "Flavio Pompermaier"  wrote:

> The problem is that my nodes have to gather data coming from some path of
> interest along the graph (depending on the type of the node), otherwise
> they just have to forward the received message adding their id to the
> message path (more or less). It's like a postal tracking system.
>
> The problem is that I have to reset the "outbox" of each vertex once the
> messages have been sent..
> Do you think that it makes sense in this case to have an outbox of
> messages (destination, message) in each vertex and reset it in the
> postSuperstep() of the VertexUpdateFunction?
>
> On Wed, Jul 8, 2015 at 9:38 AM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Hi Flavio!
>>
>> Are you talking about vertex-centric iterations in gelly?
>>
>> If yes, you can send messages to a particular vertex with
>> "sendMessageTo(vertexId, msg)" and
>> to all neighbors with "sendMessageToAllNeighbors(msg)". These methods are
>> available inside the MessagingFunction.
>> Accessing received messages happens inside the VertexUpdateFunction.
>> So, the usual way of writing these programs is to:
>> (1) go through received messages in the VertexUpdateFunction and compute
>> the new vertex value based them
>> (2) compute and send messages in the MessagingFunction.
>>
>> Would that work in you case?
>>
>> Cheers,
>> Vasia.
>>
>> On 8 July 2015 at 08:47, Flavio Pompermaier  wrote:
>>
>>>
>>> Hi to all,
>>> is there a way in gelly to forward received messages (and modify their
>>> content before sending)?
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>
>


Re: Gelly forward

2015-07-08 Thread Vasiliki Kalavri
Hi Flavio!

Are you talking about vertex-centric iterations in gelly?

If yes, you can send messages to a particular vertex with
"sendMessageTo(vertexId, msg)" and
to all neighbors with "sendMessageToAllNeighbors(msg)". These methods are
available inside the MessagingFunction.
Accessing received messages happens inside the VertexUpdateFunction.
So, the usual way of writing these programs is to:
(1) go through received messages in the VertexUpdateFunction and compute
the new vertex value based them
(2) compute and send messages in the MessagingFunction.

Would that work in you case?

Cheers,
Vasia.

On 8 July 2015 at 08:47, Flavio Pompermaier  wrote:

>
> Hi to all,
> is there a way in gelly to forward received messages (and modify their
> content before sending)?
>
> Best,
> Flavio
>
>


Re: Benchmark results between Flink and Spark

2015-07-06 Thread Vasiliki Kalavri
Hi,

Apart from the amplab benchmark, you might also find [1] and [2]
interesting. The first is a survey on existing benchmarks, while the second
proposes one. However, they are also limited to SQL-like queries.

Regarding graph processing benchmarks, I recently came across Graphalytics
[3]. The benchmark currently supports Giraph, GraphLab, Graph-X, MapReduce
and Neo4j. I hope we can add Gelly to this list soon!

Unfortunately, I'm not aware of any large-scale ML or streaming benchmarks.

Cheers,
Vasia.

[1]: http://arxiv.org/pdf/1402.5194.pdf
[2]:
http://msrg.utoronto.ca/publications/pdf_files/2013/Ghazal13-BigBench:_Towards_an_Industry_Standa.pdf
[3]: http://event.cwi.nl/grades2015/07-capota.pdf

On 6 July 2015 at 19:03, Slim Baltagi  wrote:

> Hi Hawin
>
> What you shared is not 'the Spark benchmark'.
> This benchmark measures response time on a handful of relational queries of
> different tools including Shark.
> Shark development was ended a year ago on July 1, 2014 in favor of Spark
> SQL
> which graduated from an alpha project on March 13, 2015.
> I am not aware of any published benchmark between Spark and Flink by a
> third
> party except the one that I shared from a conference paper:
> http://goo.gl/WocQci
> I hope this helps.
>
> Slim Baltagi
> Apache Flink Knowledge Base ( Now with over 300 categorized web resources!)
> http://sparkbigdata.com/component/tags/tag/27-flink
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Benchmark-results-between-Flink-and-Spark-tp1940p1961.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: ArrayIndexOutOfBoundsException when running job from JAR

2015-06-29 Thread Vasiliki Kalavri
Thank you for the answer Robert!

I realize it's a single JVM running, yet I would expect programs to behave
in the same way, i.e. serialization to happen (even if not necessary), in
order to catch this kind of bugs before cluster deployment.
Is this simply not possible or is it a design choice we made for some
reason?

-V.

On 29 June 2015 at 09:53, Robert Metzger  wrote:

> It is working in the IDE because there we execute everything in the same
> JVM, so the mapper can access the correct value of the static variable.
> When submitting a job with the CLI frontend, there are at least two JVMs
> involved, and code running in the JM/TM can not access the value from the
> static variable in the Cli frontend.
>
> On Sun, Jun 28, 2015 at 9:43 PM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> Mihail and I have now solved the issue.
>>
>> The exception was caused because the array size in question was read from
>> a static field of the enclosing class, inside an anonymous mapper. Making
>> the mapper a standalone class and passing the array size to the constructor
>> solved the issue.
>>
>> What I don't understand though, is why this worked fine when the job was
>> executed from inside the IDE. Is serialization handled differently
>> (skipped) in this case?
>>
>> Cheers,
>> Vasia.
>>
>> On 26 June 2015 at 11:30, Mihail Vieru 
>> wrote:
>>
>>>  Hi Vasia,
>>>
>>> *InitVerticesMapper* is called in the run method of APSP:
>>>
>>> *@Override*
>>> *public Graph, NullValue> run(Graph>> Tuple2, NullValue> input) {*
>>>
>>> *VertexCentricConfiguration parameters = new
>>> VertexCentricConfiguration();*
>>> *parameters.setSolutionSetUnmanagedMemory(false);*
>>>
>>> *return input.mapVertices(new
>>> InitVerticesMapper(srcVertexId))*
>>> *.runVertexCentricIteration(new VertexDistanceUpdater>> Tuple2, Integer>(srcVertexId),*
>>> *new MinDistanceMessenger>> Tuple2, Integer, NullValue>(srcVertexId),*
>>> *maxIterations, parameters);*
>>> *}*
>>>
>>> I'll send you the full code via a private e-mail.
>>>
>>> Cheers,
>>> Mihail
>>>
>>>
>>> On 26.06.2015 11:10, Vasiliki Kalavri wrote:
>>>
>>>  Hi Mihail,
>>>
>>>  could you share your code or at least the implementations of
>>> getVerticesDataSet() and InitVerticesMapper so I can take a look?
>>> Where is InitVerticesMapper called above?
>>>
>>>  Cheers,
>>> Vasia.
>>>
>>>
>>> On 26 June 2015 at 10:51, Mihail Vieru 
>>> wrote:
>>>
>>>>  Hi Robert,
>>>>
>>>> I'm using the same input data, as well as the same parameters I use in
>>>> the IDE's run configuration.
>>>> I don't run the job on the cluster (yet), but locally, by starting
>>>> Flink with the start-local.sh script.
>>>>
>>>>
>>>> I will try to explain my code a bit. The *Integer[] *array is
>>>> initialized in the *getVerticesDataSet()* method.
>>>>
>>>> *DataSet >> vertices =
>>>> getVerticesDataSet(env);*
>>>> *...*
>>>> *Graph, NullValue> graph =
>>>> Graph.fromDataSet(vertices, edges, env);*
>>>> *...*
>>>> *Graph, NullValue>
>>>> intermediateGraph = *
>>>> *graph.run(new APSP(srcVertexId,
>>>> maxIterations));*
>>>>
>>>>
>>>> In APSP I'm addressing it in the *InitVerticesMapper*, but is now
>>>> suddenly empty.
>>>>
>>>> Best,
>>>> Mihail
>>>>
>>>>
>>>> On 26.06.2015 10:00, Robert Metzger wrote:
>>>>
>>>> Hi Mihail,
>>>>
>>>>  the NPE has been thrown from
>>>> *graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)*. I guess
>>>> that is code written by you or a library you are using.
>>>> Maybe the data you are using on the cluster is different from your
>>>> local test data?
>>>>
>>>>  Best,
>>>> Robert
>>>>
>>>>
>>>> On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <
>>>> vi...@informatik.hu-berlin.d

Re: ArrayIndexOutOfBoundsException when running job from JAR

2015-06-28 Thread Vasiliki Kalavri
Hi everyone,

Mihail and I have now solved the issue.

The exception was caused because the array size in question was read from a
static field of the enclosing class, inside an anonymous mapper. Making the
mapper a standalone class and passing the array size to the constructor
solved the issue.

What I don't understand though, is why this worked fine when the job was
executed from inside the IDE. Is serialization handled differently
(skipped) in this case?

Cheers,
Vasia.

On 26 June 2015 at 11:30, Mihail Vieru 
wrote:

>  Hi Vasia,
>
> *InitVerticesMapper* is called in the run method of APSP:
>
> *@Override*
> *public Graph, NullValue> run(Graph Tuple2, NullValue> input) {*
>
> *VertexCentricConfiguration parameters = new
> VertexCentricConfiguration();*
> *parameters.setSolutionSetUnmanagedMemory(false);*
>
> *return input.mapVertices(new InitVerticesMapper(srcVertexId))*
> *.runVertexCentricIteration(new VertexDistanceUpdater Tuple2, Integer>(srcVertexId),*
> *new MinDistanceMessenger Tuple2, Integer, NullValue>(srcVertexId),*
> *maxIterations, parameters);*
> *}*
>
> I'll send you the full code via a private e-mail.
>
> Cheers,
> Mihail
>
>
> On 26.06.2015 11:10, Vasiliki Kalavri wrote:
>
>  Hi Mihail,
>
>  could you share your code or at least the implementations of
> getVerticesDataSet() and InitVerticesMapper so I can take a look?
> Where is InitVerticesMapper called above?
>
>  Cheers,
> Vasia.
>
>
> On 26 June 2015 at 10:51, Mihail Vieru 
> wrote:
>
>>  Hi Robert,
>>
>> I'm using the same input data, as well as the same parameters I use in
>> the IDE's run configuration.
>> I don't run the job on the cluster (yet), but locally, by starting Flink
>> with the start-local.sh script.
>>
>>
>> I will try to explain my code a bit. The *Integer[] *array is
>> initialized in the *getVerticesDataSet()* method.
>>
>> *DataSet >> vertices =
>> getVerticesDataSet(env);*
>> *...*
>> *Graph, NullValue> graph =
>> Graph.fromDataSet(vertices, edges, env);*
>> *...*
>> *Graph, NullValue>
>> intermediateGraph = *
>> *graph.run(new APSP(srcVertexId,
>> maxIterations));*
>>
>>
>> In APSP I'm addressing it in the *InitVerticesMapper*, but is now
>> suddenly empty.
>>
>> Best,
>> Mihail
>>
>>
>> On 26.06.2015 10:00, Robert Metzger wrote:
>>
>> Hi Mihail,
>>
>>  the NPE has been thrown from
>> *graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)*. I guess that
>> is code written by you or a library you are using.
>> Maybe the data you are using on the cluster is different from your local
>> test data?
>>
>>  Best,
>> Robert
>>
>>
>> On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <
>> vi...@informatik.hu-berlin.de> wrote:
>>
>>>  Hi,
>>>
>>> I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in
>>> the CLI.
>>> This doesn't occur in the IDE.
>>>
>>> I've build the JAR using the "maven-shade-plugin" and the pom.xml
>>> configuration Robert has provided here:
>>>
>>> https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
>>> I specify the entry point using the "-c" option.
>>>
>>> The array the Exception refers to is actually initialized when a
>>> vertices dataset is read from the file system.
>>>
>>> Any ideas on what could cause this issue?
>>>
>>> Best,
>>> Mihail
>>>
>>> P.S.: the stack trace:
>>>
>>> *org.apache.flink.client.program.ProgramInvocationException: The program
>>> execution failed: Job execution failed.*
>>> *at org.apache.flink.client.program.Client.run(Client.java:413)*
>>> *at org.apache.flink.client.program.Client.run(Client.java:356)*
>>> *at org.apache.flink.client.program.Client.run(Client.java:349)*
>>> *at
>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)*
>>> *at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)*
>>> *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>>> *at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
>>> *at
>>>

Re: ArrayIndexOutOfBoundsException when running job from JAR

2015-06-26 Thread Vasiliki Kalavri
Hi Mihail,

could you share your code or at least the implementations of
getVerticesDataSet() and InitVerticesMapper so I can take a look?
Where is InitVerticesMapper called above?

Cheers,
Vasia.


On 26 June 2015 at 10:51, Mihail Vieru 
wrote:

>  Hi Robert,
>
> I'm using the same input data, as well as the same parameters I use in the
> IDE's run configuration.
> I don't run the job on the cluster (yet), but locally, by starting Flink
> with the start-local.sh script.
>
>
> I will try to explain my code a bit. The *Integer[] *array is initialized
> in the *getVerticesDataSet()* method.
>
> *DataSet >> vertices =
> getVerticesDataSet(env);*
> *...*
> *Graph, NullValue> graph =
> Graph.fromDataSet(vertices, edges, env);*
> *...*
> *Graph, NullValue>
> intermediateGraph = *
> *graph.run(new APSP(srcVertexId, maxIterations));*
>
>
> In APSP I'm addressing it in the *InitVerticesMapper*, but is now
> suddenly empty.
>
> Best,
> Mihail
>
>
> On 26.06.2015 10:00, Robert Metzger wrote:
>
> Hi Mihail,
>
>  the NPE has been thrown from
> *graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)*. I guess that
> is code written by you or a library you are using.
> Maybe the data you are using on the cluster is different from your local
> test data?
>
>  Best,
> Robert
>
>
> On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <
> vi...@informatik.hu-berlin.de> wrote:
>
>>  Hi,
>>
>> I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in
>> the CLI.
>> This doesn't occur in the IDE.
>>
>> I've build the JAR using the "maven-shade-plugin" and the pom.xml
>> configuration Robert has provided here:
>>
>> https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
>> I specify the entry point using the "-c" option.
>>
>> The array the Exception refers to is actually initialized when a vertices
>> dataset is read from the file system.
>>
>> Any ideas on what could cause this issue?
>>
>> Best,
>> Mihail
>>
>> P.S.: the stack trace:
>>
>> *org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Job execution failed.*
>> *at org.apache.flink.client.program.Client.run(Client.java:413)*
>> *at org.apache.flink.client.program.Client.run(Client.java:356)*
>> *at org.apache.flink.client.program.Client.run(Client.java:349)*
>> *at
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)*
>> *at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)*
>> *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>> *at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
>> *at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>> *at java.lang.reflect.Method.invoke(Method.java:606)*
>> *at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)*
>> *at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)*
>> *at org.apache.flink.client.program.Client.run(Client.java:315)*
>> *at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)*
>> *at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)*
>> *at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)*
>> *at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)*
>> *Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.*
>> *at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)*
>> *at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)*
>> *at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)*
>> *at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)*
>> *at
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)*
>> *at
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)*
>> *at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)*
>> *at
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)*
>> *at akka.actor.Actor$class.aroundReceive(Actor.scala:465)*
>> *at
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)*
>> *at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)*
>> *at akka.actor.ActorCell.invoke(ActorCell.scala:487)*
>> *at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)*
>> *at akka.dispatch.Mailbox.run(Mailbox.scala:221)*
>> *at akka.dispatch.Mailbox.exec(Mailbox.scala:231)*
>> *at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
>> *at

Re: Gelly available already?

2015-03-24 Thread Vasiliki Kalavri
Hi all,

there is no Scala API for Gelly yet and no corresponding JIRA either.
It's definitely in our plans, just not for 0.9 :-)

Cheers,
-V.

On 24 March 2015 at 00:21, Henry Saputra  wrote:

> Any JIRA filed to add Scala counterparts for Gelly?
>
> - Henry
>
> On Mon, Mar 23, 2015 at 3:44 PM, Andra Lungu 
> wrote:
> > For now it just works with the Java API.
> >
> > On Mon, Mar 23, 2015 at 11:42 PM, Sebastian 
> wrote:
> >>
> >> Is gelly supposed to be usable from Scala? It looks as it is hardcoded
> to
> >> use the Java API.
> >>
> >> Best,
> >> Sebastian
> >>
> >> On 23.03.2015 23:15, Robert Metzger wrote:
> >>>
> >>> Hi,
> >>>
> >>> Gelly is not part of any offical flink release.
> >>> You have to use a Snapshot version of Flink if you want to try it out.
> >>>
> >>>
> >>> Sent from my iPhone
> >>>
> >>> On 23.03.2015, at 23:10, Andra Lungu  >>> > wrote:
> >>>
>  Hi Sebastian,
> 
>  For me it works just as described there, with 0.9, but there should be
>  no problem for 0.8.1.
>  Here is an example pom.xml
>  https://github.com/andralungu/gelly-partitioning/blob/first/pom.xml
> 
>  Hope that helps!
>  Andra
> 
>  On Mon, Mar 23, 2015 at 11:02 PM, Sebastian   > wrote:
> 
>  Hi,
> 
>  Is gelly already usable in the 0.8.1 release? I tried adding
> 
>  
>  org.apache.flink
>  flink-gelly
>  0.8.1
>  
> 
>  as described in [1], but my project fails to build.
> 
>  Best,
>  Sebastian
> 
> 
>  [1]
> 
> 
> http://ci.apache.org/projects/__flink/flink-docs-master/gelly___guide.html
> 
>  <
> http://ci.apache.org/projects/flink/flink-docs-master/gelly_guide.html>
> 
> 
> >
>


Re: RuntimeException Gelly API: Memory ran out. Compaction failed.

2015-03-18 Thread Vasiliki Kalavri
haha, yes, actually I just confirmed!
If I flip my args, I get the error you mention in the first e-mail. you're
trying to generate a graph giving the edge list as a vertex list and this
is a way too big dataset for your memory settings (cmp. ~15m edges vs. the
actual 400k).

I hope that clear everything out :-)

Cheers,
V.

On 18 March 2015 at 23:44, Vasiliki Kalavri 
wrote:

> Well, one thing I notice is that your vertices and edges args are flipped.
> Might be the source of error :-)
>
> On 18 March 2015 at 23:04, Mihail Vieru 
> wrote:
>
>>  I'm also using 0 as sourceID. The exact program arguments:
>>
>> 0 /home/vieru/dev/flink-experiments/data/social_network.edgelist
>> /home/vieru/dev/flink-experiments/data/social_network.verticeslist
>> /home/vieru/dev/flink-experiments/sssp-output-higgstwitter 10
>>
>> And yes, I call both methods on the initialized Graph *mappedInput*. I
>> don't understand why the distances are computed correctly for the small
>> graph (also read from files) but not for the larger one.
>> The messages appear to be wrong in the latter case.
>>
>>
>> On 18.03.2015 21:55, Vasiliki Kalavri wrote:
>>
>>  hmm, I'm starting to run out of ideas...
>> What's your source ID parameter? I ran mine with 0.
>> About the result, you call both createVertexCentricIteration() and
>> runVertexCentricIteration() on the initialized graph, right?
>>
>> On 18 March 2015 at 22:33, Mihail Vieru 
>> wrote:
>>
>>>  Hi Vasia,
>>>
>>> yes, I am using the latest master. I just did a pull again and the
>>> problem persists. Perhaps Robert could confirm as well.
>>>
>>> I've set the solution set to unmanaged in SSSPUnweighted as Stephan
>>> proposed and the job finishes. So I am able to proceed using this
>>> workaround.
>>>
>>> An odd thing occurs now though. The distances aren't computed correctly
>>> for the SNAP graph and remain the one set in InitVerticesMapper(). For the
>>> small graph in SSSPDataUnweighted they are OK. I'm currently investigating
>>> this behavior.
>>>
>>> Cheers,
>>> Mihail
>>>
>>>
>>> On 18.03.2015 20:55, Vasiliki Kalavri wrote:
>>>
>>>  Hi Mihail,
>>>
>>>  I used your code to generate the vertex file, then gave this and the
>>> edge list as input to your SSSP implementation and still couldn't reproduce
>>> the exception. I'm using the same local setup as I describe above.
>>> I'm not aware of any recent changes that might be relevant, but, just in
>>> case, are you using the latest master?
>>>
>>>  Cheers,
>>> V.
>>>
>>> On 18 March 2015 at 19:21, Mihail Vieru 
>>> wrote:
>>>
>>>>  Hi Vasia,
>>>>
>>>> I have used a simple job (attached) to generate a file which looks like
>>>> this:
>>>>
>>>> 0 0
>>>> 1 1
>>>> 2 2
>>>> ...
>>>> 456629 456629
>>>> 456630 456630
>>>>
>>>> I need the vertices to be generated from a file for my future work.
>>>>
>>>> Cheers,
>>>> Mihail
>>>>
>>>>
>>>>
>>>> On 18.03.2015 17:04, Vasiliki Kalavri wrote:
>>>>
>>>>  Hi Mihail, Robert,
>>>>
>>>>  I've tried reproducing this, but I couldn't.
>>>> I'm using the same twitter input graph from SNAP that you link to and
>>>> also Scala IDE.
>>>> The job finishes without a problem (both the SSSP example from Gelly
>>>> and the unweighted version).
>>>>
>>>>  The only thing I changed to run your version was creating the graph
>>>> from the edge set only, i.e. like this:
>>>>
>>>>  Graph graph = Graph.fromDataSet(edges,
>>>>  new MapFunction() {
>>>>  public Long map(Long value) {
>>>>  return Long.MAX_VALUE;
>>>>  }
>>>>  }, env);
>>>>
>>>> Since the twitter input is an edge list, how do you generate the vertex
>>>> dataset in your case?
>>>>
>>>>  Thanks,
>>>> -Vasia.
>>>>
>>>> On 18 March 2015 at 16:54, Mihail Vieru 
>>>> wrote:
>>>>
>>>>>  Hi,
>>>>>
>>>>> great! Thanks!
>>>>>
>>>>> I really need this bug fixed because I'm laying the groundwo

Re: RuntimeException Gelly API: Memory ran out. Compaction failed.

2015-03-18 Thread Vasiliki Kalavri
Well, one thing I notice is that your vertices and edges args are flipped.
Might be the source of error :-)

On 18 March 2015 at 23:04, Mihail Vieru 
wrote:

>  I'm also using 0 as sourceID. The exact program arguments:
>
> 0 /home/vieru/dev/flink-experiments/data/social_network.edgelist
> /home/vieru/dev/flink-experiments/data/social_network.verticeslist
> /home/vieru/dev/flink-experiments/sssp-output-higgstwitter 10
>
> And yes, I call both methods on the initialized Graph *mappedInput*. I
> don't understand why the distances are computed correctly for the small
> graph (also read from files) but not for the larger one.
> The messages appear to be wrong in the latter case.
>
>
> On 18.03.2015 21:55, Vasiliki Kalavri wrote:
>
>  hmm, I'm starting to run out of ideas...
> What's your source ID parameter? I ran mine with 0.
> About the result, you call both createVertexCentricIteration() and
> runVertexCentricIteration() on the initialized graph, right?
>
> On 18 March 2015 at 22:33, Mihail Vieru 
> wrote:
>
>>  Hi Vasia,
>>
>> yes, I am using the latest master. I just did a pull again and the
>> problem persists. Perhaps Robert could confirm as well.
>>
>> I've set the solution set to unmanaged in SSSPUnweighted as Stephan
>> proposed and the job finishes. So I am able to proceed using this
>> workaround.
>>
>> An odd thing occurs now though. The distances aren't computed correctly
>> for the SNAP graph and remain the one set in InitVerticesMapper(). For the
>> small graph in SSSPDataUnweighted they are OK. I'm currently investigating
>> this behavior.
>>
>> Cheers,
>> Mihail
>>
>>
>> On 18.03.2015 20:55, Vasiliki Kalavri wrote:
>>
>>  Hi Mihail,
>>
>>  I used your code to generate the vertex file, then gave this and the
>> edge list as input to your SSSP implementation and still couldn't reproduce
>> the exception. I'm using the same local setup as I describe above.
>> I'm not aware of any recent changes that might be relevant, but, just in
>> case, are you using the latest master?
>>
>>  Cheers,
>> V.
>>
>> On 18 March 2015 at 19:21, Mihail Vieru 
>> wrote:
>>
>>>  Hi Vasia,
>>>
>>> I have used a simple job (attached) to generate a file which looks like
>>> this:
>>>
>>> 0 0
>>> 1 1
>>> 2 2
>>> ...
>>> 456629 456629
>>> 456630 456630
>>>
>>> I need the vertices to be generated from a file for my future work.
>>>
>>> Cheers,
>>> Mihail
>>>
>>>
>>>
>>> On 18.03.2015 17:04, Vasiliki Kalavri wrote:
>>>
>>>  Hi Mihail, Robert,
>>>
>>>  I've tried reproducing this, but I couldn't.
>>> I'm using the same twitter input graph from SNAP that you link to and
>>> also Scala IDE.
>>> The job finishes without a problem (both the SSSP example from Gelly and
>>> the unweighted version).
>>>
>>>  The only thing I changed to run your version was creating the graph
>>> from the edge set only, i.e. like this:
>>>
>>>  Graph graph = Graph.fromDataSet(edges,
>>>  new MapFunction() {
>>>  public Long map(Long value) {
>>>  return Long.MAX_VALUE;
>>>  }
>>>  }, env);
>>>
>>> Since the twitter input is an edge list, how do you generate the vertex
>>> dataset in your case?
>>>
>>>  Thanks,
>>> -Vasia.
>>>
>>> On 18 March 2015 at 16:54, Mihail Vieru 
>>> wrote:
>>>
>>>>  Hi,
>>>>
>>>> great! Thanks!
>>>>
>>>> I really need this bug fixed because I'm laying the groundwork for my
>>>> Diplom thesis and I need to be sure that the Gelly API is reliable and can
>>>> handle large datasets as intended.
>>>>
>>>> Cheers,
>>>> Mihail
>>>>
>>>>
>>>> On 18.03.2015 15:40, Robert Waury wrote:
>>>>
>>>>   Hi,
>>>>
>>>>  I managed to reproduce the behavior and as far as I can tell it seems
>>>> to be a problem with the memory allocation.
>>>>
>>>>  I have filed a bug report in JIRA to get the attention of somebody who
>>>> knows the runtime better than I do.
>>>>
>>>> https://issues.apache.org/jira/browse/FLINK-1734
>>>>
>>>>  Cheers,
>>>>  Robert
>>>>
>>>>

Re: RuntimeException Gelly API: Memory ran out. Compaction failed.

2015-03-18 Thread Vasiliki Kalavri
hmm, I'm starting to run out of ideas...
What's your source ID parameter? I ran mine with 0.
About the result, you call both createVertexCentricIteration() and
runVertexCentricIteration() on the initialized graph, right?

On 18 March 2015 at 22:33, Mihail Vieru 
wrote:

>  Hi Vasia,
>
> yes, I am using the latest master. I just did a pull again and the problem
> persists. Perhaps Robert could confirm as well.
>
> I've set the solution set to unmanaged in SSSPUnweighted as Stephan
> proposed and the job finishes. So I am able to proceed using this
> workaround.
>
> An odd thing occurs now though. The distances aren't computed correctly
> for the SNAP graph and remain the one set in InitVerticesMapper(). For the
> small graph in SSSPDataUnweighted they are OK. I'm currently investigating
> this behavior.
>
> Cheers,
> Mihail
>
>
> On 18.03.2015 20:55, Vasiliki Kalavri wrote:
>
>  Hi Mihail,
>
>  I used your code to generate the vertex file, then gave this and the
> edge list as input to your SSSP implementation and still couldn't reproduce
> the exception. I'm using the same local setup as I describe above.
> I'm not aware of any recent changes that might be relevant, but, just in
> case, are you using the latest master?
>
>  Cheers,
> V.
>
> On 18 March 2015 at 19:21, Mihail Vieru 
> wrote:
>
>>  Hi Vasia,
>>
>> I have used a simple job (attached) to generate a file which looks like
>> this:
>>
>> 0 0
>> 1 1
>> 2 2
>> ...
>> 456629 456629
>> 456630 456630
>>
>> I need the vertices to be generated from a file for my future work.
>>
>> Cheers,
>> Mihail
>>
>>
>>
>> On 18.03.2015 17:04, Vasiliki Kalavri wrote:
>>
>>  Hi Mihail, Robert,
>>
>>  I've tried reproducing this, but I couldn't.
>> I'm using the same twitter input graph from SNAP that you link to and
>> also Scala IDE.
>> The job finishes without a problem (both the SSSP example from Gelly and
>> the unweighted version).
>>
>>  The only thing I changed to run your version was creating the graph
>> from the edge set only, i.e. like this:
>>
>>  Graph graph = Graph.fromDataSet(edges,
>>  new MapFunction() {
>>  public Long map(Long value) {
>>  return Long.MAX_VALUE;
>>  }
>>  }, env);
>>
>> Since the twitter input is an edge list, how do you generate the vertex
>> dataset in your case?
>>
>>  Thanks,
>> -Vasia.
>>
>> On 18 March 2015 at 16:54, Mihail Vieru 
>> wrote:
>>
>>>  Hi,
>>>
>>> great! Thanks!
>>>
>>> I really need this bug fixed because I'm laying the groundwork for my
>>> Diplom thesis and I need to be sure that the Gelly API is reliable and can
>>> handle large datasets as intended.
>>>
>>> Cheers,
>>> Mihail
>>>
>>>
>>> On 18.03.2015 15:40, Robert Waury wrote:
>>>
>>>   Hi,
>>>
>>>  I managed to reproduce the behavior and as far as I can tell it seems
>>> to be a problem with the memory allocation.
>>>
>>>  I have filed a bug report in JIRA to get the attention of somebody who
>>> knows the runtime better than I do.
>>>
>>> https://issues.apache.org/jira/browse/FLINK-1734
>>>
>>>  Cheers,
>>>  Robert
>>>
>>> On Tue, Mar 17, 2015 at 3:52 PM, Mihail Vieru <
>>> vi...@informatik.hu-berlin.de> wrote:
>>>
>>>>  Hi Robert,
>>>>
>>>> thank you for your reply.
>>>>
>>>> I'm starting the job from the Scala IDE. So only one JobManager and one
>>>> TaskManager in the same JVM.
>>>> I've doubled the memory in the eclipse.ini settings but I still get the
>>>> Exception.
>>>>
>>>> -vmargs
>>>> -Xmx2048m
>>>> -Xms100m
>>>> -XX:MaxPermSize=512m
>>>>
>>>> Best,
>>>> Mihail
>>>>
>>>>
>>>> On 17.03.2015 10:11, Robert Waury wrote:
>>>>
>>>>   Hi,
>>>>
>>>>  can you tell me how much memory your job has and how many workers you
>>>> are running?
>>>>
>>>>  From the trace it seems the internal hash table allocated only 7 MB
>>>> for the graph data and therefore runs out of memory pretty quickly.
>>>>
>>>>  Skewed data could also be an issue but with a minimum of 5 pages and
>>>

Re: RuntimeException Gelly API: Memory ran out. Compaction failed.

2015-03-18 Thread Vasiliki Kalavri
Hi Mihail,

I used your code to generate the vertex file, then gave this and the edge
list as input to your SSSP implementation and still couldn't reproduce the
exception. I'm using the same local setup as I describe above.
I'm not aware of any recent changes that might be relevant, but, just in
case, are you using the latest master?

Cheers,
V.

On 18 March 2015 at 19:21, Mihail Vieru 
wrote:

>  Hi Vasia,
>
> I have used a simple job (attached) to generate a file which looks like
> this:
>
> 0 0
> 1 1
> 2 2
> ...
> 456629 456629
> 456630 456630
>
> I need the vertices to be generated from a file for my future work.
>
> Cheers,
> Mihail
>
>
>
> On 18.03.2015 17:04, Vasiliki Kalavri wrote:
>
>  Hi Mihail, Robert,
>
>  I've tried reproducing this, but I couldn't.
> I'm using the same twitter input graph from SNAP that you link to and also
> Scala IDE.
> The job finishes without a problem (both the SSSP example from Gelly and
> the unweighted version).
>
>  The only thing I changed to run your version was creating the graph from
> the edge set only, i.e. like this:
>
>  Graph graph = Graph.fromDataSet(edges,
>  new MapFunction() {
>  public Long map(Long value) {
>  return Long.MAX_VALUE;
>  }
>  }, env);
>
> Since the twitter input is an edge list, how do you generate the vertex
> dataset in your case?
>
>  Thanks,
> -Vasia.
>
> On 18 March 2015 at 16:54, Mihail Vieru 
> wrote:
>
>>  Hi,
>>
>> great! Thanks!
>>
>> I really need this bug fixed because I'm laying the groundwork for my
>> Diplom thesis and I need to be sure that the Gelly API is reliable and can
>> handle large datasets as intended.
>>
>> Cheers,
>> Mihail
>>
>>
>> On 18.03.2015 15:40, Robert Waury wrote:
>>
>>   Hi,
>>
>>  I managed to reproduce the behavior and as far as I can tell it seems to
>> be a problem with the memory allocation.
>>
>>  I have filed a bug report in JIRA to get the attention of somebody who
>> knows the runtime better than I do.
>>
>> https://issues.apache.org/jira/browse/FLINK-1734
>>
>>  Cheers,
>>  Robert
>>
>> On Tue, Mar 17, 2015 at 3:52 PM, Mihail Vieru <
>> vi...@informatik.hu-berlin.de> wrote:
>>
>>>  Hi Robert,
>>>
>>> thank you for your reply.
>>>
>>> I'm starting the job from the Scala IDE. So only one JobManager and one
>>> TaskManager in the same JVM.
>>> I've doubled the memory in the eclipse.ini settings but I still get the
>>> Exception.
>>>
>>> -vmargs
>>> -Xmx2048m
>>> -Xms100m
>>> -XX:MaxPermSize=512m
>>>
>>> Best,
>>> Mihail
>>>
>>>
>>> On 17.03.2015 10:11, Robert Waury wrote:
>>>
>>>   Hi,
>>>
>>>  can you tell me how much memory your job has and how many workers you
>>> are running?
>>>
>>>  From the trace it seems the internal hash table allocated only 7 MB for
>>> the graph data and therefore runs out of memory pretty quickly.
>>>
>>>  Skewed data could also be an issue but with a minimum of 5 pages and a
>>> maximum of 8 it seems to be distributed fairly even to the different
>>> partitions.
>>>
>>>  Cheers,
>>>  Robert
>>>
>>> On Tue, Mar 17, 2015 at 1:25 AM, Mihail Vieru <
>>> vi...@informatik.hu-berlin.de> wrote:
>>>
>>>> And the correct SSSPUnweighted attached.
>>>>
>>>>
>>>> On 17.03.2015 01:23, Mihail Vieru wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm getting the following RuntimeException for an adaptation of the
>>>>> SingleSourceShortestPaths example using the Gelly API (see attachment).
>>>>> It's been adapted for unweighted graphs having vertices with Long values.
>>>>>
>>>>> As an input graph I'm using the social network graph (~200MB unpacked)
>>>>> from here: https://snap.stanford.edu/data/higgs-twitter.html
>>>>>
>>>>> For the small SSSPDataUnweighted graph (also attached) it terminates
>>>>> and computes the distances correctly.
>>>>>
>>>>>
>>>>> 03/16/2015 17:18:23IterationHead(WorksetIteration (Vertex-centric
>>>>> iteration
>>>>> (org.apache.flink.graph.library.SingleSourceShortestPathsUnweighted$Vert

Re: RuntimeException Gelly API: Memory ran out. Compaction failed.

2015-03-18 Thread Vasiliki Kalavri
Hi Robert,

my setup has even less memory than your setup, ~900MB in total.

When using the local environment (running the job through your IDE), the
available of memory is split equally between the JobManager and
TaskManager. Then, the default memory kept for network buffers is
subtracted from the TaskManager's part.
Finally, the TaskManager is assigned 70% (by default) of what is left.
In my case, this was 255MB.

So, I'm guessing that either the options you're passing to eclipse are not
properly read (I haven't tried it myself) or that there's something wrong
in the way you're generating the graph. That's why I asked how you produce
the vertex dataset.

Cheers,
V.



On 18 March 2015 at 18:27, Robert Waury  wrote:

> Hi Vasia,
>
> How much memory does your job use?
>
> I think the problem is as Stephan says a too conservative allocation but
> that it will work if you throw enough memory at it.
>
> Or did your setup succeed with an amount of memory comparable to Mihail's
> and mine?
>
> My main point is that it shouldn't take 10x more memory than the input
> size for such a job.
>
> Cheers,
> Robert
> On Mar 18, 2015 5:06 PM, "Vasiliki Kalavri" 
> wrote:
>
>> Hi Mihail, Robert,
>>
>> I've tried reproducing this, but I couldn't.
>> I'm using the same twitter input graph from SNAP that you link to and
>> also Scala IDE.
>> The job finishes without a problem (both the SSSP example from Gelly and
>> the unweighted version).
>>
>> The only thing I changed to run your version was creating the graph from
>> the edge set only, i.e. like this:
>>
>> Graph graph = Graph.fromDataSet(edges,
>> new MapFunction() {
>> public Long map(Long value) {
>> return Long.MAX_VALUE;
>> }
>> }, env);
>>
>> Since the twitter input is an edge list, how do you generate the vertex
>> dataset in your case?
>>
>> Thanks,
>> -Vasia.
>>
>> On 18 March 2015 at 16:54, Mihail Vieru 
>> wrote:
>>
>>>  Hi,
>>>
>>> great! Thanks!
>>>
>>> I really need this bug fixed because I'm laying the groundwork for my
>>> Diplom thesis and I need to be sure that the Gelly API is reliable and can
>>> handle large datasets as intended.
>>>
>>> Cheers,
>>> Mihail
>>>
>>>
>>> On 18.03.2015 15:40, Robert Waury wrote:
>>>
>>>   Hi,
>>>
>>>  I managed to reproduce the behavior and as far as I can tell it seems
>>> to be a problem with the memory allocation.
>>>
>>>  I have filed a bug report in JIRA to get the attention of somebody who
>>> knows the runtime better than I do.
>>>
>>> https://issues.apache.org/jira/browse/FLINK-1734
>>>
>>>  Cheers,
>>>  Robert
>>>
>>> On Tue, Mar 17, 2015 at 3:52 PM, Mihail Vieru <
>>> vi...@informatik.hu-berlin.de> wrote:
>>>
>>>>  Hi Robert,
>>>>
>>>> thank you for your reply.
>>>>
>>>> I'm starting the job from the Scala IDE. So only one JobManager and one
>>>> TaskManager in the same JVM.
>>>> I've doubled the memory in the eclipse.ini settings but I still get the
>>>> Exception.
>>>>
>>>> -vmargs
>>>> -Xmx2048m
>>>> -Xms100m
>>>> -XX:MaxPermSize=512m
>>>>
>>>> Best,
>>>> Mihail
>>>>
>>>>
>>>> On 17.03.2015 10:11, Robert Waury wrote:
>>>>
>>>>   Hi,
>>>>
>>>>  can you tell me how much memory your job has and how many workers you
>>>> are running?
>>>>
>>>>  From the trace it seems the internal hash table allocated only 7 MB
>>>> for the graph data and therefore runs out of memory pretty quickly.
>>>>
>>>>  Skewed data could also be an issue but with a minimum of 5 pages and
>>>> a maximum of 8 it seems to be distributed fairly even to the different
>>>> partitions.
>>>>
>>>>  Cheers,
>>>>  Robert
>>>>
>>>> On Tue, Mar 17, 2015 at 1:25 AM, Mihail Vieru <
>>>> vi...@informatik.hu-berlin.de> wrote:
>>>>
>>>>> And the correct SSSPUnweighted attached.
>>>>>
>>>>>
>>>>> On 17.03.2015 01:23, Mihail Vieru wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>&g

Re: RuntimeException Gelly API: Memory ran out. Compaction failed.

2015-03-18 Thread Vasiliki Kalavri
Hi Mihail, Robert,

I've tried reproducing this, but I couldn't.
I'm using the same twitter input graph from SNAP that you link to and also
Scala IDE.
The job finishes without a problem (both the SSSP example from Gelly and
the unweighted version).

The only thing I changed to run your version was creating the graph from
the edge set only, i.e. like this:

Graph graph = Graph.fromDataSet(edges,
new MapFunction() {
public Long map(Long value) {
return Long.MAX_VALUE;
}
}, env);

Since the twitter input is an edge list, how do you generate the vertex
dataset in your case?

Thanks,
-Vasia.

On 18 March 2015 at 16:54, Mihail Vieru 
wrote:

>  Hi,
>
> great! Thanks!
>
> I really need this bug fixed because I'm laying the groundwork for my
> Diplom thesis and I need to be sure that the Gelly API is reliable and can
> handle large datasets as intended.
>
> Cheers,
> Mihail
>
>
> On 18.03.2015 15:40, Robert Waury wrote:
>
>   Hi,
>
>  I managed to reproduce the behavior and as far as I can tell it seems to
> be a problem with the memory allocation.
>
>  I have filed a bug report in JIRA to get the attention of somebody who
> knows the runtime better than I do.
>
> https://issues.apache.org/jira/browse/FLINK-1734
>
>  Cheers,
>  Robert
>
> On Tue, Mar 17, 2015 at 3:52 PM, Mihail Vieru <
> vi...@informatik.hu-berlin.de> wrote:
>
>>  Hi Robert,
>>
>> thank you for your reply.
>>
>> I'm starting the job from the Scala IDE. So only one JobManager and one
>> TaskManager in the same JVM.
>> I've doubled the memory in the eclipse.ini settings but I still get the
>> Exception.
>>
>> -vmargs
>> -Xmx2048m
>> -Xms100m
>> -XX:MaxPermSize=512m
>>
>> Best,
>> Mihail
>>
>>
>> On 17.03.2015 10:11, Robert Waury wrote:
>>
>>   Hi,
>>
>>  can you tell me how much memory your job has and how many workers you
>> are running?
>>
>>  From the trace it seems the internal hash table allocated only 7 MB for
>> the graph data and therefore runs out of memory pretty quickly.
>>
>>  Skewed data could also be an issue but with a minimum of 5 pages and a
>> maximum of 8 it seems to be distributed fairly even to the different
>> partitions.
>>
>>  Cheers,
>>  Robert
>>
>> On Tue, Mar 17, 2015 at 1:25 AM, Mihail Vieru <
>> vi...@informatik.hu-berlin.de> wrote:
>>
>>> And the correct SSSPUnweighted attached.
>>>
>>>
>>> On 17.03.2015 01:23, Mihail Vieru wrote:
>>>
 Hi,

 I'm getting the following RuntimeException for an adaptation of the
 SingleSourceShortestPaths example using the Gelly API (see attachment).
 It's been adapted for unweighted graphs having vertices with Long values.

 As an input graph I'm using the social network graph (~200MB unpacked)
 from here: https://snap.stanford.edu/data/higgs-twitter.html

 For the small SSSPDataUnweighted graph (also attached) it terminates
 and computes the distances correctly.


 03/16/2015 17:18:23IterationHead(WorksetIteration (Vertex-centric
 iteration
 (org.apache.flink.graph.library.SingleSourceShortestPathsUnweighted$VertexDistanceUpdater@dca6fe4
 |
 org.apache.flink.graph.library.SingleSourceShortestPathsUnweighted$MinDistanceMessenger@6577e8ce)))(2/4)
 switched to FAILED
 java.lang.RuntimeException: Memory ran out. Compaction failed.
 numPartitions: 32 minPartition: 5 maxPartition: 8 number of overflow
 segments: 176 bucketSize: 217 Overall memory: 20316160 Partition memory:
 7208960 Message: Index: 8, Size: 7
 at
 org.apache.flink.runtime.operators.hash.CompactingHashTable.insert(CompactingHashTable.java:390)
 at
 org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTable(CompactingHashTable.java:337)
 at
 org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:216)
 at
 org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:278)
 at
 org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
 at
 org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:205)
 at java.lang.Thread.run(Thread.java:745)


 Best,
 Mihail

>>>
>>>
>>
>>
>
>


Re: Using Spargel's FilterOnVerices gets stuck.

2015-02-18 Thread Vasiliki Kalavri
Hi Hung,

I am under the impression that circular dependencies like the one you are
describing are not allowed in the Flink execution graph. I would actually
expect something like this to cause an error.

Maybe someone else can elaborate on that?

In any case, the proper way to write iterative programs in Flink is by
using the dedicated iteration operators.
As far as I understand, you want to run an iterative algorithm on a graph,
where you feed the result of one iteration to the next.
Is this the case?

For the moment, if you want to use a Graph API, you can either use Gelly's
runVertexCentricIteration or Spargel.

If your algorithm requires more flexibility, then you can build an
arbitrary dataflow inside an iteration, using Flink's iteration operators.
Take a look at [1] for a description of how to use those and let us know if
you have any doubts.

Cheers,
V.

[1]:
http://flink.apache.org/docs/0.8/programming_guide.html#iteration-operators


On 18 February 2015 at 20:53, HungChang  wrote:

> Thank you for your reply.
>
> The dataset:
> The 1MB dataset is 38831 nodes and 99565 edges which doesn't get stuck.
> The 30MB dataset is 1,134,890 nodes and 2,987,624 edges which gets stuck.
>
> Our code works like the following logic:
>
> do{
>
> filteredGraph = graph.run(algorithm);
>
> // Get sub-graph for next iteration, where the code gets stuck with 30MB
> dataset
> filteredGraph = filteredGraph.filterOnVertices(new FilterVertex());
>
> }(while filteredGraph.hasNode())
>
>
> filter function is as follow
>
> ---
> public static final class FilterVertex implements
> FilterFunction Integer, Integer>> {
>
> @Override
> public boolean filter(Tuple4 Integer> value)
> throws Exception {
> return value.f0 == -1;
> }
> }
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/Using-Spargel-s-FilterOnVerices-gets-stuck-tp743p745.html
> Sent from the Apache Flink (Incubator) User Mailing List archive. mailing
> list archive at Nabble.com.
>


Re: Using Spargel's FilterOnVerices gets stuck.

2015-02-18 Thread Vasiliki Kalavri
Hi Hung,

can you share some details on your algorithm and dataset?
I could not reproduce this by just running a filterOnVertices on large
input.

Thank you,
Vasia.

On 18 February 2015 at 19:03, HungChang  wrote:

> Hi,
>
> I have a question about generating the sub-graph using Spargel API.
> We use filterOnVertices to generate it.
> With 30MB edges, the code gets stuck at Join(Join at filterOnVertices)
> With 2MB edges, the code doesn't have this issue.
>
> Log
>
> --
> 02/18/2015 10:34:23:Join(Join at filterOnVertices(Graph.java:615))
> (7/20)
> switched to FINISHED
> 02/18/2015 10:34:23:Join(Join at filterOnVertices(Graph.java:615))
> (12/20)
> switched to FINISHED
> 02/18/2015 10:34:23:Join(Join at filterOnVertices(Graph.java:615))
> (14/20)
> switched to FINISHED
> 02/18/2015 10:34:23:Join(Join at filterOnVertices(Graph.java:615))
> (17/20)
> switched to FINISHED
> 02/18/2015 10:34:23:Join(Join at filterOnVertices(Graph.java:615))
> (20/20)
> switched to FINISHED
> 02/18/2015 10:34:23:Join(Join at filterOnVertices(Graph.java:615))
> (13/20)
> switched to FINISHED
> 02/18/2015 10:34:24:Join(Join at filterOnVertices(Graph.java:615))
> (8/20)
> switched to FINISHED
> 02/18/2015 10:34:24:Join(Join at filterOnVertices(Graph.java:615))
> (2/20)
> switched to FINISHED
> 02/18/2015 10:34:24:Join(Join at filterOnVertices(Graph.java:615))
> (3/20)
> switched to FINISHED
> 02/18/2015 10:34:24:Join(Join at filterOnVertices(Graph.java:615))
> (19/20)
> switched to FINISHED
> 02/18/2015 10:34:24:Join(Join at filterOnVertices(Graph.java:615))
> (16/20)
> switched to FINISHED
>
> It takes more than 10 minutes to continue while other operators complete in
> seconds.
> From the log, it looks like some workers finish and some doesn't.
>
> The Spargel API shows it uses join twice so this operator looks a bit
> expensive.
> Would it be the reason that the job gets stuck?
> Our goal of using filterOnVertices is to use the sub-graph as an input for
> next iteration.
>
>
> --
> public Graph filterOnVertices(FilterFunction>
> vertexFilter) {
>
> DataSet> filteredVertices =
> this.vertices.filter(vertexFilter);
>
> DataSet> remainingEdges =
> this.edges.join(filteredVertices)
> .where(0).equalTo(0)
> .with(new ProjectEdge())
> .join(filteredVertices).where(1).equalTo(0)
> .with(new ProjectEdge());
>
> return new Graph(filteredVertices,
> remainingEdges,
> this.context);
> }
>
> Best regards,
>
> Hung
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/Using-Spargel-s-FilterOnVerices-gets-stuck-tp743.html
> Sent from the Apache Flink (Incubator) User Mailing List archive. mailing
> list archive at Nabble.com.
>


Re: Can a master class control the superstep in Flink Spargel ?

2015-02-15 Thread Vasiliki Kalavri
Hi,

currently, there is no such built-in master compute class, but you can
easily have the equivalent functionality it as follows:

- If your algorithm has a fixed pattern of superstep types, e.g. an
initialization superstep, a main phase and a finalization superstep, then
you can simply chain these types of Spargel iterations together, by feeding
the output of one as input to the next.

For example:

Graph inputGraph = ...
Graph graph1 = inputGraph.runVertexCentricIteration(new UpdateType1(), new
MessangerType1(), maxIter1);
Graph graph2 = graph1.runVertexCentricIteration(new UpdateType2(), new
MessangerType2(), maxIter2);
Graph graph3 = graph2.runVertexCentricIteration(new UpdateType3(), new
MessangerType3(), maxIter3);

and so on.

- If your algorithm needs to switch between superstep types based on some
control flow that is not defined beforehand, you can emulate the master
functionality by using an iteration aggregator. An aggregator can
accumulate values from all vertices during an iteration and then, the
aggregated value can be read in the beginning of the next iteration. This
way you could keep track of different phases inside your algorithm.

For example, your VertexUpdateFunction could look like this:

VertexUpdater extends VertexUpdateFunction {
int phase;
 @Override
public void preSuperstep() {
phase = this.getIterationAggregator("phase.aggregator");
}

@Override
public void updateVertex(key, value, messages) {
if (phase == 1) {
// do type 1 superstep

}
else if (phase ==2) {
// do type 2 superstep

}
else ...
}
}

Cheers,
V.

On 14 February 2015 at 22:59, HungChang  wrote:

> Hi,
>
> Would it be available to control the supersteps in Flink Spargel?
> For example, a master controls the basic graph algorithm having 5 phases
> and
> the master can switch between the phases.
>
> In the given example of Spargel those are send msg and update msg
> sequentially.
> Would it be possible to switch to a specific super step?
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/Can-a-master-class-control-the-superstep-in-Flink-Spargel-tp733.html
> Sent from the Apache Flink (Incubator) User Mailing List archive. mailing
> list archive at Nabble.com.
>


Re: Multiple sources shortest path

2015-02-15 Thread Vasiliki Kalavri
Hi,

you can certainly use a for-loop like this to run SSSP several times. Just
make sure you return or store the result of the computation for each
source, by adding a data sink e.g.:

for (id : Ids) {
  graph.run(new SingleSourceShortestPaths(id, maxIterations))
  .getVertices().print();
}

However, if you have a large amount of source nodes, executing one SSSP for
each of them is probably not the most efficient way to go.
Instead, you could maybe write a custom multiple shortest paths program,
where each node calculates distances for multiple sources in each
iteration. In this case, the vertex value could be a vector of size equal
to the number of input sources.

Cheers,
V.

On 14 February 2015 at 12:26, HungChang  wrote:

> Hi,
>
> In graph api there's an single source shortest path library.
>
> DataSet> singleSourceShortestPaths =
> graph.run(new SingleSourceShortestPaths(srcVertexId,
> maxIterations)).getVertices();
>
> For Multiple Source, would it be possible to run it for all nodes using
> for-loop?
> for example,
>
> for(Node node: nodes){
>  DataSet> singleSourceShortestPaths =
> graph.run(new SingleSourceShortestPaths(node,
> maxIterations)).getVertices();
> }
>
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-sources-shortest-path-tp729.html
> Sent from the Apache Flink (Incubator) User Mailing List archive. mailing
> list archive at Nabble.com.
>


Re: DeltaIterations: shrink solution set

2015-02-10 Thread Vasiliki Kalavri
Hi,

It's hard to tell without details about your algorithm, but what you're
describing sounds to me like something you can use the workset for.

-V.
On Feb 10, 2015 6:54 PM, "Alexander Alexandrov" <
alexander.s.alexand...@gmail.com> wrote:

> I am not sure whether this is supported at the moment. The only workaround
> I could think of is indeed to use a boolean flag that indicates whether the
> element has been deleted or not.
>
> An alternative approach is to ditch Flink's native iteration construct and
> write your intermediate results to Tachyon or HDFS after each iteration
> using the TypeInfoInput/OutputFormats. You then have full control how the
> old and the new solutions sets should be merged.
>
> BTW can you share some details about that particular algorithm? I was
> thinking about examples iterative algorithms with this property...
>
> Regards,
> A.
>
>
> 2015-02-10 14:18 GMT+01:00 Kruse, Sebastian :
>
>>  Hi everyone,
>>
>>
>>
>> From playing around a bit around with delta iterations, I saw that you
>> can update elements from the solution set and add new elements. My question
>> is: is it possible to remove elements from the solution set (apart from
>> marking them as “deleted” somehow)?
>>
>>
>>
>> My use case at hand for this is the following: In each iteration, I
>> generate candidate solutions that I want to verify within the next
>> iteration. If verification fails, I would like to remove them from the
>> solution set, otherwise retain them.
>>
>>
>>
>> Thanks,
>>
>> Sebastian
>>
>
>


Re: flink loop

2015-02-05 Thread Vasiliki Kalavri
Hi,

I'm not familiar with the particular algorithm, but you can most probably
use one of the two iterate operators in Flink.

You can read a description and see some examples in the documentation:
http://flink.apache.org/docs/0.8/programming_guide.html#iteration-operators

Let us know if you have any questions!

Cheers,
V.

On 5 February 2015 at 20:37, tanguy racinet  wrote:

> Hi,
>
> We are trying to develop the Apriori algorith with the Flink for our Data
> minning project.
> In our understanding, Flink could handle loop within the workflow.
> However, our knowledge is limited and we cannot find a nice way to do it.
>
> Here is the flow of my algorithm :
> GenerateCandidates > CalculateFrequentItemSet
> mapper  > reducer
>
> We would like to use the reducer result as the mapper's entry for a
> predefined number of times (loop x times).
>
> Is there any smart way to that with Flink. Or should we just copy paste
> the loop x times ?
>
> Thank you,
> 
>
>
> *Racinet Tanguy*
>
> *EIT ICT Labs Master School Student*
> *Distributed Systems and Services*
>
> Tel : +33 6 63 20 89 16 / +49 176 3749 8854
> Mail : tanraci...@gmail.com
>
> ᐧ
>