Re: Start streaming tuples depending on another streams rate

2017-02-09 Thread Tzu-Li (Gordon) Tai
Hi Jonas,

A few things to clarify first:

Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, 
the rate drops to 10 tuples/s.

From this description it seems like the job is re-reading from the beginning 
from the topic, and once you reach the latest record at the head of the queue, 
you start getting the normal input rate again, correct?

What I now want is that while tuples from A are being processed in flatMap1, 
the stream B in flatMap2 should wait until the rate of the A stream has dropped 
and only then, be flatMap2 should be called.

So what you are looking for is that flatMap2 for stream B only doing work after 
the job reaches the latest record in stream A?

If that’s the case, I would not rely on determining a drop on the threshold 
rate value. It isn’t reliable because it’s dependent on stream A’s actual input 
rate, which naturally as a stream changes over time.

I’m not sure if it’s the best solution, but this is what I have in mind:
You could perhaps insert a special marker event into stream A every time you 
start running this job.
Your job can have an operator before your co-flatMap operator that expects this 
special marker, and when it receives it (which is when the head of stream A is 
reached),  broadcasts a special event to the co-flatMap for flatMap2 to be 
processed.
Then, once flatMap2 is invoked with the special event, you can toggle logic in 
flatMap2 to actually start doing stuff.

Cheers,
Gordon
On February 9, 2017 at 8:09:33 PM, Jonas (jo...@huntun.de) wrote:

Hi! I have a job that uses a RichCoFlatMapFunction of two streams: A and B.
A
.connect(B)
.keyBy(_.id, _.id)
.flatMap(new MyOp)
In MyOp, the A stream tuples are combined to form a state using a 
ValueStateDescriptor. Stream A is usually started from the beginning of a Kafka 
topic. Stream A has a rate of 100k tuples/s. After processing the whole Kafka 
queue, the rate drops to 10 tuples/s. A big drop. What I now want is that while 
tuples from A are being processed in flatMap1, the stream B in flatMap2 should 
wait until the rate of the A stream has dropped and only then, be flatMap2 
should be called. Ideally, this behaviour would be captured in a separate 
operator, like RateBasedStreamValve or something like that :) To solve this, my 
idea is to add a counter/timer in the RichCoFlatMapFunction that counts how 
many tuples have been processed from A. If the rate drops below a threshold 
(here maybe 15 tuples/s), flatMap2 that proesses tuples from B empties the 
buffer. However, this would make my RichCoFlatMapFunction much bigger and would 
not allow for operator reuse in other scenarios. I'm of course happy to answer 
if something is unclear. -- Jonas
View this message in context: Start streaming tuples depending on another 
streams rate
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Where to put "pre-start" logic and how to detect recovery?

2017-02-09 Thread Tzu-Li (Gordon) Tai
Hi Dmitry,

I think currently the simplest way to do this is simply to add a
program argument as a flag to whether or not the current run
is from a savepoint (so you manually supply the flag whenever you’re starting 
the
job from a savepoint), and check that flag in the main method.
The main method will only be executed at the client once on every job submit,
and not on job auto restarts from checkpoints due to failures.

Cheers,
Gordon

On February 9, 2017 at 11:08:54 PM, Dmitry Golubets (dgolub...@gmail.com) wrote:

Hi,

I need to re-create a Kafka topic when a job is started in "clean" mode.
I can do it, but I'm not sure if I do it in the right place.

Is it fine to put this kind of code in the "main"?
Then it's called on every job submit.
But.. how to detect if a job is being started from a savepoint?

Or is there a different approach?

Best regards,
Dmitry

"Job submission to the JobManager timed out" on EMR YARN cluster with multiple jobs

2017-02-09 Thread Geoffrey Mon
Hello all,

I'm running a Flink plan made up of multiple jobs. The source for my job
can be found here if it would help in any way:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
Each of the jobs (except for the first job) depends on files generated by
the previous job; I'm running it on an AWS EMR cluster using YARN.

When I submit the plan file, the first job runs as planned. After it
completes, the second job is submitted by the YARN client:


02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED
02/09/2017 16:39:43 Job execution switched to status FINISHED.
2017-02-09 16:40:26,470 INFO  org.apache.flink.yarn.YarnClusterClient
- Waiting until all TaskManagers have connected
Waiting until all TaskManagers have connected
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
- TaskManager status (5/5)
TaskManager status (5/5)
2017-02-09 16:40:26,476 INFO  org.apache.flink.yarn.YarnClusterClient
- All TaskManagers are connected
All TaskManagers are connected
2017-02-09 16:40:26,480 INFO  org.apache.flink.yarn.YarnClusterClient
- Submitting job with JobID:
b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion.
Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for
job completion.
Connected to JobManager at Actor[akka.tcp://flink@
.ec2.internal:35598/user/jobmanager#68430682]

If the input file is small and the first job runs quickly (~1 minute works
for me), then the second job runs fine. However, if the input file for my
first job is large and the first job takes more than a minute or so to
complete, Flink will not acknowledge receiving the next job; the web Flink
console does not show any new jobs and Flink logs do not mention receiving
any new jobs after the first job has completed. The YARN client's job
submission times out after Flink does not respond:

Caused by:
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
Job submission to the JobManager timed out. You may increase
'akka.client.timeout' in case the JobManager needs more time to configure
and confirm the job submission.
at
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)

I have tried increasing akka.client.timeout to large values such as 1200s
(20 minutes), but even then Flink does not acknowledge or execute any other
jobs and there is the same timeout error. Does anyone know how I can get
Flink to execute all of the jobs properly?

Cheers,
Geoffrey Mon


Re: Flink 1.2 Maven dependency

2017-02-09 Thread Yassine MARZOUGUI
Hi,

I coud find the dependency here :
https://search.maven.org/#artifactdetails%7Corg.apache.flink%7Cflink-core%7C1.2.0%7Cjar
, I wonder why it still doesn't show in http://mvnrepository.com/
artifact/org.apache.flink/flink-core.

The dependency version for Flink 1.2 is 1.2.0.


org.apache.flink
flink-core
1.2.0


Best,
Yassine


On Feb 9, 2017 20:39, "Dominik Safaric"  wrote:

Hi,

I’ve been trying to use the Flink 1.2 Maven dependency, but unfortunately I
was not able to retrieve it.

In addition, I cannot find the 1.2 version neither on the repository
website (e.g. Flink core http://mvnrepository.com/artifact/org.apache.flink/
flink-core).

Could someone explain why there isn’t a Maven dependency available yet?

Thanks,
Dominik


Flink 1.2 Maven dependency

2017-02-09 Thread Dominik Safaric
Hi,

I’ve been trying to use the Flink 1.2 Maven dependency, but unfortunately I was 
not able to retrieve it. 

In addition, I cannot find the 1.2 version neither on the repository website 
(e.g. Flink core http://mvnrepository.com/artifact/org.apache.flink/flink-core 
). 

Could someone explain why there isn’t a Maven dependency available yet? 

Thanks, 
Dominik



Re: Questions about the V-C Iteration in Gelly

2017-02-09 Thread Xingcan Cui
Hi Vasia,

thanks for your reply. It helped a lot and I got some new ideas.

a) As you said, I did use the getPreviousIterationAggregate() method in
preSuperstep() of the next superstep.
However, if the (only?) global (aggregate) results can not be guaranteed to
be consistency,  what should we
do with the postSuperstep() method?

b) Though we can active vertices by update method or messages, IMO, it may
be more proper for users
themselves to decide when to halt a vertex's iteration. Considering a
complex algorithm that contains different
phases inside a vertex-centric iteration. Before moving to the next phase
(that should be synchronized),
there may be some vertices that already finished their work in current
phase and they just wait for others.
Users may choose the finished vertices to idle until the next phase, but
rather than to halt them.
Can we consider adding the voteToHalt() method and some internal variables
to the Vertex/Edge class
(or just create an "advanced" version of them) to make the halting more
controllable?

c) Sorry that I didn't make it clear before. Here the initialization means
a "global" one that executes once
before the iteration. For example, users may want to initialize the
vertices' values by their adjacent edges
before the iteration starts. Maybe we can add an extra coGroupFunction to
the configuration parameters
and apply it before the iteration?

What do you think?

(BTW, I started a PR on FLINK-1526(MST Lib&Example). Considering the
complexity, the example is not
provided.)

Really appreciate for all your help.

Best,
Xingcan

On Thu, Feb 9, 2017 at 5:36 PM, Vasiliki Kalavri 
wrote:

> Hi Xingcan,
>
> On 7 February 2017 at 10:10, Xingcan Cui  wrote:
>
>> Hi all,
>>
>> I got some question about the vertex-centric iteration in Gelly.
>>
>> a)  It seems the postSuperstep method is called before the superstep
>> barrier (I got different aggregate values of the same superstep in this
>> method). Is this a bug? Or the design is just like that?
>>
>
> ​The postSuperstep() method is called inside the close() method of a
> RichCoGroupFunction that wraps the ComputeFunction. The close() method It
> is called after the last call to the coGroup() after each iteration
> superstep.
> The aggregate values are not guaranteed to be consistent during the same
> superstep when they are computed. To retrieve an aggregate value for
> superstep i, you should use the getPreviousIterationAggregate() method in
> superstep i+1.
>
>
>>
>> b) There is not setHalt method for vertices. When no message received, a
>> vertex just quit the next iteration. Should I manually send messages (like
>> heartbeat) to keep the vertices active?
>>
>
> ​That's because vertex halting is implicitly controlled by the underlying
> delta iterations of Flink. ​A vertex will remain active as long as it
> receives a message or it updates its value, otherwise it will become
> inactive. The documentation on Gelly iterations [1] and DataSet iterations
> [2] might be helpful.
>
>
>
>>
>> c) I think we may need an initialization method in the ComputeFunction.
>>
>
>
> ​There exists a preSuperstep() method for initialization. This one will be
> executed once per superstep before the compute function is invoked for
> every vertex. Would this work for you?
>
>
>
>>
>> Any opinions? Thanks.
>>
>> Best,
>> Xingcan
>>
>>
>>
> ​I hope this helps,
> -Vasia.​
>
>
> ​[1]: https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/libs/gelly/iterative_graph_processing.
> html#vertex-centric-iterations
> [2]: https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/batch/iterations.html​
>
>


Re: Remove old <1.0 documentation from google

2017-02-09 Thread Ufuk Celebi
OK, I've added this issue here: https://issues.apache.org/jira/browse/FLINK-5764

I have time to address it next week.


Re: Remove old <1.0 documentation from google

2017-02-09 Thread Jonas
Doesen't Google offer wildcard removal?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Remove-old-1-0-documentation-from-google-tp11541p11551.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Where to put "pre-start" logic and how to detect recovery?

2017-02-09 Thread Dmitry Golubets
Hi,

I need to re-create a Kafka topic when a job is started in "clean" mode.
I can do it, but I'm not sure if I do it in the right place.

Is it fine to put this kind of code in the "main"?
Then it's called on every job submit.
But.. how to detect if a job is being started from a savepoint?

Or is there a different approach?

Best regards,
Dmitry


Re: Remove old <1.0 documentation from google

2017-02-09 Thread Ufuk Celebi
@Jonas: We added that, but as Greg said some docs are apparently not updated 
automatically (see here https://github.com/apache/flink/pull/3242).

As mentioned in the linked PR I agree that we should consider removing 
everything < 1.0.0 and redirect the older URL to the latest stable release.

@David, Stephan: What do you think about this?

We didn't do it, because we weren't sure what the redirects will mean for the 
general page rank of the docs, but I think this is not maintable as it is right 
now.

On 9 February 2017 at 15:13:53, Greg Hogan (c...@greghogan.com) wrote:
> See FLINK-5575.
> https://issues.apache.org/jira/browse/FLINK-5575
>  
> Looks like release-0.8 and older are not automatically rebuilt.
> https://ci.apache.org/builders/
>  
> On Thu, Feb 9, 2017 at 7:17 AM, Jonas wrote:
>  
> > Maybe add "This documentation is outdated. Please switch to a newer version
> > by clicking here ".
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-
> > mailing-list-archive.2336050.n4.nabble.com/Remove-old-1-0-
> > documentation-from-google-tp11541p11544.html
> > Sent from the Apache Flink User Mailing List archive. mailing list archive
> > at Nabble.com.
> >
>  



Re: Remove old <1.0 documentation from google

2017-02-09 Thread Greg Hogan
See FLINK-5575.
  https://issues.apache.org/jira/browse/FLINK-5575

Looks like release-0.8 and older are not automatically rebuilt.
  https://ci.apache.org/builders/

On Thu, Feb 9, 2017 at 7:17 AM, Jonas  wrote:

> Maybe add "This documentation is outdated. Please switch to a newer version
> by clicking here ".
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Remove-old-1-0-
> documentation-from-google-tp11541p11544.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: #kafka/#ssl: how to enable ssl authentication for a new kafka consumer?

2017-02-09 Thread alex.decastro
 Cool, thanks. Just checked it. 

One last question: 

if the server hosting my Kafka broker has only SSL enabled, but not SASL
(Kerberos) how to go about enabling connection authentication between client
consumer and broker? 

Same for data transfer? 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/kafka-ssl-how-to-enable-ssl-authentication-for-a-new-kafka-consumer-tp11532p11547.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Flink 1.2 and Cassandra Connector

2017-02-09 Thread Nico
Hi,

I would like to upgrade to the new stable version 1.2 - but i get an
ClassNotFound exception when i start the application.

Caused by: java.lang.NoClassDefFoundError: com/codahale/metrics/Metric
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1367)
at com.datastax.driver.core.Cluster.init(Cluster.java:162)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
at
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
at
org.apache.flink.streaming.connectors.cassandra.CassandraTupleSink.open(CassandraTupleSink.java:42)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)


So I think the cassandra connector is the reason for it. Moreover, i don't
see a version 1.2 in the maven repository for the connector as mentioned in
the doc.


  org.apache.flink
  flink-connector-cassandra_2.10
  1.2.0


Is there a plan to release a new version?

Best,
Nico


Re: #kafka/#ssl: how to enable ssl authentication for a new kafka consumer?

2017-02-09 Thread Robert Metzger
I've added another answer on SO that explains how you can pass a custom
configuration object to the execution environment.

On Thu, Feb 9, 2017 at 11:09 AM, alex.decastro 
wrote:

> I found a similar question and answer at #stackoverflow
> http://stackoverflow.com/questions/37743194/local-flink-config-running-
> standalone-from-ide
>
> Verify?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/kafka-ssl-how-
> to-enable-ssl-authentication-for-a-new-kafka-consumer-tp11532p11539.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Remove old <1.0 documentation from google

2017-02-09 Thread Jonas
Maybe add "This documentation is outdated. Please switch to a newer version
by clicking here ".



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Remove-old-1-0-documentation-from-google-tp11541p11544.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How about Discourse (https://www.discourse.org/) for this mailing list

2017-02-09 Thread Jonas
I might want to add that although these two are available, the content of the
submissions is still often unreadable and not properly formatted. At least
for me this is annoying to read. Additionally we have Stackoverflow which
has a nice UI for editing but not really good for discussions.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-about-Discourse-https-www-discourse-org-for-this-mailing-list-tp11448p11543.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Start streaming tuples depending on another streams rate

2017-02-09 Thread Jonas
Hi!I have a job that uses a RichCoFlatMapFunction of two streams: A and B.In
MyOp, the A stream tuples are combined to form a state using a
ValueStateDescriptor. Stream A is usually started from the beginning of a
Kafka topic. Stream A has a rate of 100k tuples/s. After processing the
whole Kafka queue, the rate drops to 10 tuples/s. A big drop.What I now want
is that while tuples from A are being processed in flatMap1, the stream B in
flatMap2 should wait until the rate of the A stream has dropped and only
then, be flatMap2 should be called. Ideally, this behaviour would be
captured in a separate operator, like RateBasedStreamValve or something like
that :)To solve this, my idea is to add a counter/timer in the
RichCoFlatMapFunction that counts how many tuples have been processed from
A. If the rate drops below a threshold (here maybe 15 tuples/s), flatMap2
that proesses tuples from B empties the buffer. However, this would make my
RichCoFlatMapFunction much bigger and would not allow for operator reuse in
other scenarios.I'm of course happy to answer if something is unclear.--
Jonas



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Start-streaming-tuples-depending-on-another-streams-rate-tp11542.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Remove old <1.0 documentation from google

2017-02-09 Thread Jonas
Hi!

Its really annoying that if you search for something in Flink, you often get
old documentation from Google. Example: Google "flink quickstart scala" and
you get
https://ci.apache.org/projects/flink/flink-docs-release-0.8/scala_api_quickstart.html




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Remove-old-1-0-documentation-from-google-tp11541.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


tasks running in parallel beyond configured parallelism/slots

2017-02-09 Thread Antony Mayi
Hi,
I am new to Flink and bit confused about the execution pipeline of my Flink 
job. I run it on cluster of three task managers (flink 1.1.2) each configured 
with just single slot. I submit my job with parallelism set to 3.
This is the global plan (low res - just to show the initial forking): 
http://pasteboard.co/weyMrFlZl.png
This is a detail of the front part: http://pasteboard.co/wez3DVvfW.png
My confusion is how comes all the parallel operations in the second column (10 
operations) are being executed at the same time if there should be capacity for 
max of 3 running at once? Also they are all executed mostly on same node while 
the others are idle.
Thanks for anything useful,Antony.

Re: #kafka/#ssl: how to enable ssl authentication for a new kafka consumer?

2017-02-09 Thread alex.decastro
I found a similar question and answer at #stackoverflow 
http://stackoverflow.com/questions/37743194/local-flink-config-running-standalone-from-ide

Verify? 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/kafka-ssl-how-to-enable-ssl-authentication-for-a-new-kafka-consumer-tp11532p11539.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: #kafka/#ssl: how to enable ssl authentication for a new kafka consumer?

2017-02-09 Thread alex.decastro
Thanks Robert.
As a beginner Flinker, hot to tell my Flink app (in Intellij say) where the
flink-conf.yaml is.  

Alex





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/kafka-ssl-how-to-enable-ssl-authentication-for-a-new-kafka-consumer-tp11532p11538.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: #kafka/#ssl: how to enable ssl authentication for a new kafka consumer?

2017-02-09 Thread Robert Metzger
Check out the documentation:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#enabling-kerberos-authentication-for-versions-09-and-above-only

On Wed, Feb 8, 2017 at 4:40 PM, alex.decastro 
wrote:

> Dear flinkers,
> I'm consuming from a kafka broker in a server that has ssl authentication
> enabled? How do I config my consumer to compy with it?
>
> Many thanks
> Alex
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/kafka-ssl-how-
> to-enable-ssl-authentication-for-a-new-kafka-consumer-tp11532.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Questions about the V-C Iteration in Gelly

2017-02-09 Thread Vasiliki Kalavri
Hi Xingcan,

On 7 February 2017 at 10:10, Xingcan Cui  wrote:

> Hi all,
>
> I got some question about the vertex-centric iteration in Gelly.
>
> a)  It seems the postSuperstep method is called before the superstep
> barrier (I got different aggregate values of the same superstep in this
> method). Is this a bug? Or the design is just like that?
>

​The postSuperstep() method is called inside the close() method of a
RichCoGroupFunction that wraps the ComputeFunction. The close() method It
is called after the last call to the coGroup() after each iteration
superstep.
The aggregate values are not guaranteed to be consistent during the same
superstep when they are computed. To retrieve an aggregate value for
superstep i, you should use the getPreviousIterationAggregate() method in
superstep i+1.


>
> b) There is not setHalt method for vertices. When no message received, a
> vertex just quit the next iteration. Should I manually send messages (like
> heartbeat) to keep the vertices active?
>

​That's because vertex halting is implicitly controlled by the underlying
delta iterations of Flink. ​A vertex will remain active as long as it
receives a message or it updates its value, otherwise it will become
inactive. The documentation on Gelly iterations [1] and DataSet iterations
[2] might be helpful.



>
> c) I think we may need an initialization method in the ComputeFunction.
>


​There exists a preSuperstep() method for initialization. This one will be
executed once per superstep before the compute function is invoked for
every vertex. Would this work for you?



>
> Any opinions? Thanks.
>
> Best,
> Xingcan
>
>
>
​I hope this helps,
-Vasia.​


​[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/gelly/iterative_graph_processing.html#vertex-centric-iterations
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/iterations.html
​