Re: Turn lazy operator execution off for streaming jobs

2015-01-21 Thread Gyula Fóra
Thank you! I will play around with it.

On Wed, Jan 21, 2015 at 3:50 PM, Ufuk Celebi u...@apache.org wrote:

 Hey Gyula,

 On 21 Jan 2015, at 15:41, Gyula Fóra gyf...@apache.org wrote:

  Hey Guys,
 
  I think it would make sense to turn lazy operator execution off for
  streaming programs because it would make life simpler for windowing.  I
  also created a JIRA issue here
  https://issues.apache.org/jira/browse/FLINK-1425.
 
  Can anyone give me some quick pointers how to do this? Its probably
 simple,
  I am just not familiar with that part of the code. (Or maybe its so easy
  that someone could pick this up :) )

 Have a look at the JobManager ScheduleOrUpdateConsumers message, which is
 how it is done currently. The first produced buffer of an intermediate
 results triggers this message. I think the cleanest solution would be to do
 this directly when scheduling a streaming job?

  By the way, do you see any reasons why we should not do this?

 ATM, I don't.


Very strange behaviour of groupBy() - sort() - first()

2015-01-21 Thread Felix Neutatz
Hi,

my use case is the following:

I have a Tuple2String,Long. I want to group by the String and sum up the
Long values accordingly. This works fine with these lines:

DataSetLineitem lineitems = getLineitemDataSet(env);
lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
1);

After the aggregation I want to print the 10 groups with the highest sum,
like:

string1, 100L
string2, 50L
string3, 1L

I tried that:

lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();

But instead of 3 records, I get a lot more.

Can see my error?

Best regards,

Felix


Re: [flink-streaming] Regarding loops in the Job Graph

2015-01-21 Thread Stephan Ewen
Hi Paris!

The Streaming API allows you to define iterations, where parts of the
stream are fed back. Do those work for you?

In general, cyclic flows are a tricky thing, as the topological order of
operators is needed for scheduling (may not be important for continuous
streams) but also for a clear producer/consumer relationship, which is
important for fault tolerance techniques.

Currently, the JobManager topologically sorts the job graph and starts
scheduling operators. I am surprised to hear that a graph with cyclic
dependencies works...


Stephan




Stephan


On Wed, Jan 21, 2015 at 2:57 AM, Paris Carbone par...@kth.se wrote:

 Hello,

 While implementing the SAMOA adapter for Flink-Streaming we stumbled upon
 the need to allow loops (or circular dependencies) in the job graph. Many
 incremental machine learning tasks define loops already  and there is no
 trivial way of getting around it. In the streaming job graph builder there
 is only a check that does not allow the user to submit graphs with loops,
 however, from what Gyula told me, if the check is removed the streaming job
 runs as expected. Is there (still) a major reason for having this check, at
 least in the streaming component?

 Paris


[jira] [Created] (FLINK-1430) Add test for streaming scala api completeness

2015-01-21 Thread JIRA
Márton Balassi created FLINK-1430:
-

 Summary: Add test for streaming scala api completeness
 Key: FLINK-1430
 URL: https://issues.apache.org/jira/browse/FLINK-1430
 Project: Flink
  Issue Type: Test
  Components: Streaming
Affects Versions: 0.9
Reporter: Márton Balassi


Currently the completeness of the streaming scala api is not tested.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Turn lazy operator execution off for streaming jobs

2015-01-21 Thread Stephan Ewen
I think that this is a fairly delicate thing.

The execution graph / scheduling is the most delicate part of the system. I
would not feel too well about a quick fix there, so let's think this
through a little bit.

The logic currently does the following:

1) It schedules the sources (see ExecutionGaph.scheduleForExecution())

2) Successors of operators are scheduled when the intermediate result
partition / queue tells the master that data is available.

3) The successor requests the stream from the producer.


Possible changes:
 - We could definitely change the ExecutionGraph.scheduleForExecution()
method to deploy all tasks immediately. I would suggest to have a schedule
mode attached to the JobGraph that defines how to do that. The mode could
have values (FROM_SOURCES, ALL, BACK_TRACKING). From sources is what we do
right now, backtracking is what we will do in the next release, ALL is what
you need)

 - When all tasks are scheduled immediately, it may be that for a channel,
the sender is not yet deployed when the receiver is deployed. That should
be okay, since the same can happen right now when all-to-all patterns
connect the tasks.

 - The queues would still send notifications to the JobManager that data is
available, but the JM will see that the target task is already deployed (or
currently being deployed). Then the info where to grab a channel from would
need to be sent to the task. That mechanism also exists already.


@Ufuk: It seems that it may actually work to simply kick off the deployment
of all tasks immediately (in the ExecutionGraph.scheduleForExecution()
method). Do you see any other implications?

Greetings,
Stephan


On Wed, Jan 21, 2015 at 6:50 AM, Ufuk Celebi u...@apache.org wrote:

 Hey Gyula,

 On 21 Jan 2015, at 15:41, Gyula Fóra gyf...@apache.org wrote:

  Hey Guys,
 
  I think it would make sense to turn lazy operator execution off for
  streaming programs because it would make life simpler for windowing.  I
  also created a JIRA issue here
  https://issues.apache.org/jira/browse/FLINK-1425.
 
  Can anyone give me some quick pointers how to do this? Its probably
 simple,
  I am just not familiar with that part of the code. (Or maybe its so easy
  that someone could pick this up :) )

 Have a look at the JobManager ScheduleOrUpdateConsumers message, which is
 how it is done currently. The first produced buffer of an intermediate
 results triggers this message. I think the cleanest solution would be to do
 this directly when scheduling a streaming job?

  By the way, do you see any reasons why we should not do this?

 ATM, I don't.


Re: Very strange behaviour of groupBy() - sort() - first()

2015-01-21 Thread Fabian Hueske
Chesnay is right.
Right now, it is not possible to do want you want in a straightforward way
because Flink does not support to fully sort a data set (there are several
related issues in JIRA).

A workaround would be to attach a constant value to each tuple, group on
that (all tuples are sent to the same group), sort that group, and apply
the first operator.

2015-01-21 20:22 GMT+01:00 Chesnay Schepler chesnay.schep...@fu-berlin.de:

 If i remember correctly first() returns the first n values for every
 group. the javadocs actually don't make this behaviour very clear.


 On 21.01.2015 19:18, Felix Neutatz wrote:

 Hi,

 my use case is the following:

 I have a Tuple2String,Long. I want to group by the String and sum up the
 Long values accordingly. This works fine with these lines:

 DataSetLineitem lineitems = getLineitemDataSet(env);
 lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
 1);

 After the aggregation I want to print the 10 groups with the highest sum,
 like:

 string1, 100L
 string2, 50L
 string3, 1L

 I tried that:

 lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();

 But instead of 3 records, I get a lot more.

 Can see my error?

 Best regards,

 Felix





Re: Very strange behaviour of groupBy() - sort() - first()

2015-01-21 Thread Chesnay Schepler
If i remember correctly first() returns the first n values for every 
group. the javadocs actually don't make this behaviour very clear.


On 21.01.2015 19:18, Felix Neutatz wrote:

Hi,

my use case is the following:

I have a Tuple2String,Long. I want to group by the String and sum up the
Long values accordingly. This works fine with these lines:

DataSetLineitem lineitems = getLineitemDataSet(env);
lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
1);

After the aggregation I want to print the 10 groups with the highest sum,
like:

string1, 100L
string2, 50L
string3, 1L

I tried that:

lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();

But instead of 3 records, I get a lot more.

Can see my error?

Best regards,

Felix





Re: Very strange behaviour of groupBy() - sort() - first()

2015-01-21 Thread Stephan Ewen
Chesnay is right.

What you want is a non-grouped sort/first, which would need to be added...

Stephan
Am 21.01.2015 11:25 schrieb Chesnay Schepler 
chesnay.schep...@fu-berlin.de:

 If i remember correctly first() returns the first n values for every
 group. the javadocs actually don't make this behaviour very clear.

 On 21.01.2015 19:18, Felix Neutatz wrote:

 Hi,

 my use case is the following:

 I have a Tuple2String,Long. I want to group by the String and sum up the
 Long values accordingly. This works fine with these lines:

 DataSetLineitem lineitems = getLineitemDataSet(env);
 lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
 1);

 After the aggregation I want to print the 10 groups with the highest sum,
 like:

 string1, 100L
 string2, 50L
 string3, 1L

 I tried that:

 lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();

 But instead of 3 records, I get a lot more.

 Can see my error?

 Best regards,

 Felix





[jira] [Created] (FLINK-1428) Typos in Java code example for RichGroupReduceFunction

2015-01-21 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-1428:


 Summary: Typos in Java code example for RichGroupReduceFunction
 Key: FLINK-1428
 URL: https://issues.apache.org/jira/browse/FLINK-1428
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Felix Neutatz
Priority: Minor


http://flink.apache.org/docs/0.7-incubating/dataset_transformations.html

String key = null //missing ';'

public void combine(IterableTuple3String, Integer, Double in,
  CollectorTuple3String, Integer, Double out))
-- one ')' too much



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Master not building and how to notice it faster in the future

2015-01-21 Thread Henry Saputra
Would it be better to use Github Jenkins plugin [1] to connect to ASF
Jenkins cluster?

[1] 
https://wiki.jenkins-ci.org/display/JENKINS/GitHub+pull+request+builder+plugin
[2] 
http://events.linuxfoundation.org/sites/events/files/slides/Jenkins_at_ASF_2014.pdf

On Tue, Jan 20, 2015 at 2:57 PM, Robert Metzger rmetz...@apache.org wrote:
 Hi,

 it seems that our master is currently not building. See:
 https://travis-ci.org/apache/flink/jobs/47689754

 We need to come up with a good solution to notify dev@flink when builds on
 Travis are failing.

 We also had unstable builds recently due to too short akka timeouts and it
 took some time to realize that.

 Usually, travis has a pretty good email notification system, but thats not
 working for us because the repository is owned by the apache github user.
 I think only users with admin permissions at apache/flink are notified by
 email from travis.

 There are certainly ways to fix this. Right now, the best approach is
 probably setting up a REST 2 e-mail service somewhere which is mailing to
 our dev@ list (
 http://docs.travis-ci.com/user/notifications/#Webhook-notification).


 Robert


Re: Implementing a list accumulator

2015-01-21 Thread Stephan Ewen
True, that is tricky. The user code does not necessarily respect the
non-reuse mode. That may be true for any user code. Can the list
accumulator immediately serialize the objects and send over a byte array?
That should since it reliably without adding overhead (serialization will
happen anyways).
Am 21.01.2015 11:04 schrieb Ufuk Celebi u...@apache.org:

 The thing is that the DefaultCrossFunction always uses the same holder
 Tuple2 object, which is then handed over to the chained collect helper
 flatMap(). I can see why it is OK to keep the default functions to reuse
 holder objects, but when they are chained to an operator it becomes
 problematic.

 On 21 Jan 2015, at 17:12, Ufuk Celebi u...@apache.org wrote:

  I just checked out your branch and ran it with a break point set at the
 CollectHelper. If you look into the (list) accumulator you see that always
 the same object is added to it. Strangely enough, object re-use is disabled
 in the config. I don't have time to look further into it now, but it seems
 to be a problem with the object re-use mode.
 
  – Ufuk
 
  On 20 Jan 2015, at 20:53, Max Michels m...@data-artisans.com wrote:
 
  Hi everyone,
 
  I'm running into some problems implementing a Accumulator for
  returning a list of a DataSet.
 
  https://github.com/mxm/flink/tree/count/collect
 
  Basically, it works fine in this test case:
 
 
ExecutionEnvironment env =
 ExecutionEnvironment.getExecutionEnvironment();
 
Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
 
DataSetInteger data = env.fromElements(input);
 
// count
long numEntries = data.count();
assertEquals(10, numEntries);
 
// collect
ArrayListInteger list = (ArrayListInteger) data.collect();
assertArrayEquals(input, list.toArray());
 
 
  But with non-primitive objects strange results occur:
 
  ExecutionEnvironment env =
 ExecutionEnvironment.getExecutionEnvironment();
  env.getConfig().disableObjectReuse();
 
  DataSetInteger data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  DataSetInteger data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9,
 10);
 
  DataSetTuple2Integer, Integer data3 = data.cross(data2);
 
  // count
  long numEntries = data3.count();
  assertEquals(100, numEntries);
 
  // collect
  ArrayListTuple2Integer, Integer list = (ArrayListTuple2Integer,
  Integer) data3.collect();
 
  System.out.println(list)
 
  
 
  Output:
 
  [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
  (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
  (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6),
  (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
  (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
  (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
  (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
  (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9),
  (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
  (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
  (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
  (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
  (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)]
 
  I assume, the problem is the clone() method of the ListAccumulator
  where we just create a shallow copy. This is fine for accumulators
  which use primitive objects, like IntCounter but here we have a real
  object.
 
  How do we work around this problem?
 
  Best regards,
  Max
 




Re: Master not building and how to notice it faster in the future

2015-01-21 Thread Robert Metzger
Is the git hook something we can control for everybody? I thought its more
like a personal thing everybody can set up if wanted?

I'm against enforcing something like this for every committer. I don't want
to wait for 15 minutes for pushing a typo fix to the documentation.


On Wed, Jan 21, 2015 at 11:36 AM, Ufuk Celebi u...@apache.org wrote:


 On 21 Jan 2015, at 10:40, Till Rohrmann trohrm...@apache.org wrote:

  I like both approaches because maven won't necessarily find all problems
 if
  executed locally. Especially concurrency problems seem to occur more
 often
  on travis than on my local machine.

 I agree with Till. Let's add a git hook to catch most of the local
 problems and the web hook for the Travis builds.


Re: Master not building and how to notice it faster in the future

2015-01-21 Thread Max Michels
Hi Robert,

I like your solution using Travis and Google App Engine. However, I
think there's a much simpler solution which can prevent commiters from
pushing not even compiling or test-failing code to the master in the
first place.

Commiters could simply install a git pre-push hook in their git
repository (this is merely placing a file named pre-push in
.git/hooks/). The pre-push hook would simply run mvn clean package
and would then fail to push to the master if maven does not report
success.

Such a precaution would save effort and time for everyone. You already
pointed out that not all eventualities are covered by the tests, so
this could be an additional measure to the Travis notifiction on the
dev mailing list.

Best regards,
Max

On Wed, Jan 21, 2015 at 12:30 AM, Ufuk Celebi u...@apache.org wrote:

 On 21 Jan 2015, at 00:19, Robert Metzger rmetz...@apache.org wrote:

 I think its just a missing import.

 Yes.

 Maybe we can use Google AppEngine for that. It seems that their free
 offering is sufficient for our purpose:
 https://cloud.google.com/pricing/#app-engine. It also allows sending emails.
 I guess its hard to get the token for the apache user. Maybe there is is
 a way to validate the authenticity of the requests differently.

 App engine sounds reasonable. :-)


[jira] [Created] (FLINK-1425) Turn lazy operator execution off for streaming programs

2015-01-21 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-1425:
-

 Summary: Turn lazy operator execution off for streaming programs
 Key: FLINK-1425
 URL: https://issues.apache.org/jira/browse/FLINK-1425
 Project: Flink
  Issue Type: Task
  Components: Streaming
Affects Versions: 0.9
Reporter: Gyula Fora


Streaming programs currently use the same lazy operator execution model as 
batch programs. This makes the functionality of some operators like time based 
windowing very awkward, since they start computing windows based on the start 
of the operator.

Also, one should expect for streaming programs to run continuously so there is 
not much to gain from lazy execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Master not building and how to notice it faster in the future

2015-01-21 Thread Ufuk Celebi
Thanks for the nice script. I've just installed it :-)

On 21 Jan 2015, at 13:57, Max Michels m...@data-artisans.com wrote:

 I've created a pre-push hook that does what I described (and a bit
 more). It does only enforce a check for the remote flink master branch
 and doesn't disturb you on your pushes.
 
 https://gist.github.com/mxm/4d1e26b901c66a682e4a
 
 Just put the the file in the .git/hooks/ directory of your repository 
 directory.
 
 For example, if you run git push origin, it will check the URL of
 origin to contain git.apache.org/flink.git. If the remote branch is
 master, it will run the 'mvn clean package' and then fail to push if
 maven doesn't exit successfully.
 
 Here some examples where I set protected_remote to my remote at
 ssh://g...@github.com/mxm/flink.git and protected_branch to master:
 
 
 $ git push origin
 Verifying push to master via 'maven clean package'.
 ...
 Commits could not be verified. Executed 'mvn clean package' and it returned 1
 error: failed to push some refs to 'ssh://g...@github.com/mxm/flink.git'
 
 
 $ git push origin
 Verifying push to master via 'maven clean package'.
 ...
 Counting objects: 12, done.
 Delta compression using up to 8 threads.
 Compressing objects: 100% (8/8), done.
 Writing objects: 100% (12/12), 838 bytes | 0 bytes/s, done.
 Total 12 (delta 5), reused 0 (delta 0)
 To ssh://g...@github.com/mxm/flink.git
   ce8acc4..d60197b  master - master
 
 
 $ git checkout master
 $ git push origin other:master
 Please switch to branch other to verify.
 
 
 $ vim README # make some changes
 $ git push origin
 Please commit or stash your pending changes.
 
 
 Hope this comes in handy for you.
 
 Best regards,
 Max
 
 On Wed, Jan 21, 2015 at 1:46 PM, Max Michels m...@data-artisans.com wrote:
 @Robert The pre-push hook only resides in your local repository. It
 cannot be pushed. Thus, we cannot enforce this check but it certainly
 helps to prevent mistakes. As Ufuk mentioned, you can then even skip
 the check with the --no-verify option if you're really sure.
 
 On Wed, Jan 21, 2015 at 11:45 AM, Ufuk Celebi u...@apache.org wrote:
 
 On 21 Jan 2015, at 11:40, Robert Metzger rmetz...@apache.org wrote:
 
 Is the git hook something we can control for everybody? I thought its more
 like a personal thing everybody can set up if wanted?
 
 I'm against enforcing something like this for every committer. I don't want
 to wait for 15 minutes for pushing a typo fix to the documentation.
 
 Sorry, I didn't mean to have it in the repo. As long as we notice failing 
 builds fast enough (aka email notification), it should be OK to keep it 
 that way. As you said, every committer should decide this on her own. BTW 
 You can always skip hooks with git push -n (--no-verify).



Turn lazy operator execution off for streaming jobs

2015-01-21 Thread Gyula Fóra
Hey Guys,

I think it would make sense to turn lazy operator execution off for
streaming programs because it would make life simpler for windowing.  I
also created a JIRA issue here
https://issues.apache.org/jira/browse/FLINK-1425.

Can anyone give me some quick pointers how to do this? Its probably simple,
I am just not familiar with that part of the code. (Or maybe its so easy
that someone could pick this up :) )

By the way, do you see any reasons why we should not do this?

Thank you!
Gyula