Re: Turn lazy operator execution off for streaming jobs
Thank you! I will play around with it. On Wed, Jan 21, 2015 at 3:50 PM, Ufuk Celebi u...@apache.org wrote: Hey Gyula, On 21 Jan 2015, at 15:41, Gyula Fóra gyf...@apache.org wrote: Hey Guys, I think it would make sense to turn lazy operator execution off for streaming programs because it would make life simpler for windowing. I also created a JIRA issue here https://issues.apache.org/jira/browse/FLINK-1425. Can anyone give me some quick pointers how to do this? Its probably simple, I am just not familiar with that part of the code. (Or maybe its so easy that someone could pick this up :) ) Have a look at the JobManager ScheduleOrUpdateConsumers message, which is how it is done currently. The first produced buffer of an intermediate results triggers this message. I think the cleanest solution would be to do this directly when scheduling a streaming job? By the way, do you see any reasons why we should not do this? ATM, I don't.
Very strange behaviour of groupBy() - sort() - first()
Hi, my use case is the following: I have a Tuple2String,Long. I want to group by the String and sum up the Long values accordingly. This works fine with these lines: DataSetLineitem lineitems = getLineitemDataSet(env); lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, 1); After the aggregation I want to print the 10 groups with the highest sum, like: string1, 100L string2, 50L string3, 1L I tried that: lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print(); But instead of 3 records, I get a lot more. Can see my error? Best regards, Felix
Re: [flink-streaming] Regarding loops in the Job Graph
Hi Paris! The Streaming API allows you to define iterations, where parts of the stream are fed back. Do those work for you? In general, cyclic flows are a tricky thing, as the topological order of operators is needed for scheduling (may not be important for continuous streams) but also for a clear producer/consumer relationship, which is important for fault tolerance techniques. Currently, the JobManager topologically sorts the job graph and starts scheduling operators. I am surprised to hear that a graph with cyclic dependencies works... Stephan Stephan On Wed, Jan 21, 2015 at 2:57 AM, Paris Carbone par...@kth.se wrote: Hello, While implementing the SAMOA adapter for Flink-Streaming we stumbled upon the need to allow loops (or circular dependencies) in the job graph. Many incremental machine learning tasks define loops already and there is no trivial way of getting around it. In the streaming job graph builder there is only a check that does not allow the user to submit graphs with loops, however, from what Gyula told me, if the check is removed the streaming job runs as expected. Is there (still) a major reason for having this check, at least in the streaming component? Paris
[jira] [Created] (FLINK-1430) Add test for streaming scala api completeness
Márton Balassi created FLINK-1430: - Summary: Add test for streaming scala api completeness Key: FLINK-1430 URL: https://issues.apache.org/jira/browse/FLINK-1430 Project: Flink Issue Type: Test Components: Streaming Affects Versions: 0.9 Reporter: Márton Balassi Currently the completeness of the streaming scala api is not tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Turn lazy operator execution off for streaming jobs
I think that this is a fairly delicate thing. The execution graph / scheduling is the most delicate part of the system. I would not feel too well about a quick fix there, so let's think this through a little bit. The logic currently does the following: 1) It schedules the sources (see ExecutionGaph.scheduleForExecution()) 2) Successors of operators are scheduled when the intermediate result partition / queue tells the master that data is available. 3) The successor requests the stream from the producer. Possible changes: - We could definitely change the ExecutionGraph.scheduleForExecution() method to deploy all tasks immediately. I would suggest to have a schedule mode attached to the JobGraph that defines how to do that. The mode could have values (FROM_SOURCES, ALL, BACK_TRACKING). From sources is what we do right now, backtracking is what we will do in the next release, ALL is what you need) - When all tasks are scheduled immediately, it may be that for a channel, the sender is not yet deployed when the receiver is deployed. That should be okay, since the same can happen right now when all-to-all patterns connect the tasks. - The queues would still send notifications to the JobManager that data is available, but the JM will see that the target task is already deployed (or currently being deployed). Then the info where to grab a channel from would need to be sent to the task. That mechanism also exists already. @Ufuk: It seems that it may actually work to simply kick off the deployment of all tasks immediately (in the ExecutionGraph.scheduleForExecution() method). Do you see any other implications? Greetings, Stephan On Wed, Jan 21, 2015 at 6:50 AM, Ufuk Celebi u...@apache.org wrote: Hey Gyula, On 21 Jan 2015, at 15:41, Gyula Fóra gyf...@apache.org wrote: Hey Guys, I think it would make sense to turn lazy operator execution off for streaming programs because it would make life simpler for windowing. I also created a JIRA issue here https://issues.apache.org/jira/browse/FLINK-1425. Can anyone give me some quick pointers how to do this? Its probably simple, I am just not familiar with that part of the code. (Or maybe its so easy that someone could pick this up :) ) Have a look at the JobManager ScheduleOrUpdateConsumers message, which is how it is done currently. The first produced buffer of an intermediate results triggers this message. I think the cleanest solution would be to do this directly when scheduling a streaming job? By the way, do you see any reasons why we should not do this? ATM, I don't.
Re: Very strange behaviour of groupBy() - sort() - first()
Chesnay is right. Right now, it is not possible to do want you want in a straightforward way because Flink does not support to fully sort a data set (there are several related issues in JIRA). A workaround would be to attach a constant value to each tuple, group on that (all tuples are sent to the same group), sort that group, and apply the first operator. 2015-01-21 20:22 GMT+01:00 Chesnay Schepler chesnay.schep...@fu-berlin.de: If i remember correctly first() returns the first n values for every group. the javadocs actually don't make this behaviour very clear. On 21.01.2015 19:18, Felix Neutatz wrote: Hi, my use case is the following: I have a Tuple2String,Long. I want to group by the String and sum up the Long values accordingly. This works fine with these lines: DataSetLineitem lineitems = getLineitemDataSet(env); lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, 1); After the aggregation I want to print the 10 groups with the highest sum, like: string1, 100L string2, 50L string3, 1L I tried that: lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print(); But instead of 3 records, I get a lot more. Can see my error? Best regards, Felix
Re: Very strange behaviour of groupBy() - sort() - first()
If i remember correctly first() returns the first n values for every group. the javadocs actually don't make this behaviour very clear. On 21.01.2015 19:18, Felix Neutatz wrote: Hi, my use case is the following: I have a Tuple2String,Long. I want to group by the String and sum up the Long values accordingly. This works fine with these lines: DataSetLineitem lineitems = getLineitemDataSet(env); lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, 1); After the aggregation I want to print the 10 groups with the highest sum, like: string1, 100L string2, 50L string3, 1L I tried that: lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print(); But instead of 3 records, I get a lot more. Can see my error? Best regards, Felix
Re: Very strange behaviour of groupBy() - sort() - first()
Chesnay is right. What you want is a non-grouped sort/first, which would need to be added... Stephan Am 21.01.2015 11:25 schrieb Chesnay Schepler chesnay.schep...@fu-berlin.de: If i remember correctly first() returns the first n values for every group. the javadocs actually don't make this behaviour very clear. On 21.01.2015 19:18, Felix Neutatz wrote: Hi, my use case is the following: I have a Tuple2String,Long. I want to group by the String and sum up the Long values accordingly. This works fine with these lines: DataSetLineitem lineitems = getLineitemDataSet(env); lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, 1); After the aggregation I want to print the 10 groups with the highest sum, like: string1, 100L string2, 50L string3, 1L I tried that: lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print(); But instead of 3 records, I get a lot more. Can see my error? Best regards, Felix
[jira] [Created] (FLINK-1428) Typos in Java code example for RichGroupReduceFunction
Felix Neutatz created FLINK-1428: Summary: Typos in Java code example for RichGroupReduceFunction Key: FLINK-1428 URL: https://issues.apache.org/jira/browse/FLINK-1428 Project: Flink Issue Type: Bug Components: Project Website Reporter: Felix Neutatz Priority: Minor http://flink.apache.org/docs/0.7-incubating/dataset_transformations.html String key = null //missing ';' public void combine(IterableTuple3String, Integer, Double in, CollectorTuple3String, Integer, Double out)) -- one ')' too much -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Master not building and how to notice it faster in the future
Would it be better to use Github Jenkins plugin [1] to connect to ASF Jenkins cluster? [1] https://wiki.jenkins-ci.org/display/JENKINS/GitHub+pull+request+builder+plugin [2] http://events.linuxfoundation.org/sites/events/files/slides/Jenkins_at_ASF_2014.pdf On Tue, Jan 20, 2015 at 2:57 PM, Robert Metzger rmetz...@apache.org wrote: Hi, it seems that our master is currently not building. See: https://travis-ci.org/apache/flink/jobs/47689754 We need to come up with a good solution to notify dev@flink when builds on Travis are failing. We also had unstable builds recently due to too short akka timeouts and it took some time to realize that. Usually, travis has a pretty good email notification system, but thats not working for us because the repository is owned by the apache github user. I think only users with admin permissions at apache/flink are notified by email from travis. There are certainly ways to fix this. Right now, the best approach is probably setting up a REST 2 e-mail service somewhere which is mailing to our dev@ list ( http://docs.travis-ci.com/user/notifications/#Webhook-notification). Robert
Re: Implementing a list accumulator
True, that is tricky. The user code does not necessarily respect the non-reuse mode. That may be true for any user code. Can the list accumulator immediately serialize the objects and send over a byte array? That should since it reliably without adding overhead (serialization will happen anyways). Am 21.01.2015 11:04 schrieb Ufuk Celebi u...@apache.org: The thing is that the DefaultCrossFunction always uses the same holder Tuple2 object, which is then handed over to the chained collect helper flatMap(). I can see why it is OK to keep the default functions to reuse holder objects, but when they are chained to an operator it becomes problematic. On 21 Jan 2015, at 17:12, Ufuk Celebi u...@apache.org wrote: I just checked out your branch and ran it with a break point set at the CollectHelper. If you look into the (list) accumulator you see that always the same object is added to it. Strangely enough, object re-use is disabled in the config. I don't have time to look further into it now, but it seems to be a problem with the object re-use mode. – Ufuk On 20 Jan 2015, at 20:53, Max Michels m...@data-artisans.com wrote: Hi everyone, I'm running into some problems implementing a Accumulator for returning a list of a DataSet. https://github.com/mxm/flink/tree/count/collect Basically, it works fine in this test case: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; DataSetInteger data = env.fromElements(input); // count long numEntries = data.count(); assertEquals(10, numEntries); // collect ArrayListInteger list = (ArrayListInteger) data.collect(); assertArrayEquals(input, list.toArray()); But with non-primitive objects strange results occur: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableObjectReuse(); DataSetInteger data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); DataSetInteger data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); DataSetTuple2Integer, Integer data3 = data.cross(data2); // count long numEntries = data3.count(); assertEquals(100, numEntries); // collect ArrayListTuple2Integer, Integer list = (ArrayListTuple2Integer, Integer) data3.collect(); System.out.println(list) Output: [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)] I assume, the problem is the clone() method of the ListAccumulator where we just create a shallow copy. This is fine for accumulators which use primitive objects, like IntCounter but here we have a real object. How do we work around this problem? Best regards, Max
Re: Master not building and how to notice it faster in the future
Is the git hook something we can control for everybody? I thought its more like a personal thing everybody can set up if wanted? I'm against enforcing something like this for every committer. I don't want to wait for 15 minutes for pushing a typo fix to the documentation. On Wed, Jan 21, 2015 at 11:36 AM, Ufuk Celebi u...@apache.org wrote: On 21 Jan 2015, at 10:40, Till Rohrmann trohrm...@apache.org wrote: I like both approaches because maven won't necessarily find all problems if executed locally. Especially concurrency problems seem to occur more often on travis than on my local machine. I agree with Till. Let's add a git hook to catch most of the local problems and the web hook for the Travis builds.
Re: Master not building and how to notice it faster in the future
Hi Robert, I like your solution using Travis and Google App Engine. However, I think there's a much simpler solution which can prevent commiters from pushing not even compiling or test-failing code to the master in the first place. Commiters could simply install a git pre-push hook in their git repository (this is merely placing a file named pre-push in .git/hooks/). The pre-push hook would simply run mvn clean package and would then fail to push to the master if maven does not report success. Such a precaution would save effort and time for everyone. You already pointed out that not all eventualities are covered by the tests, so this could be an additional measure to the Travis notifiction on the dev mailing list. Best regards, Max On Wed, Jan 21, 2015 at 12:30 AM, Ufuk Celebi u...@apache.org wrote: On 21 Jan 2015, at 00:19, Robert Metzger rmetz...@apache.org wrote: I think its just a missing import. Yes. Maybe we can use Google AppEngine for that. It seems that their free offering is sufficient for our purpose: https://cloud.google.com/pricing/#app-engine. It also allows sending emails. I guess its hard to get the token for the apache user. Maybe there is is a way to validate the authenticity of the requests differently. App engine sounds reasonable. :-)
[jira] [Created] (FLINK-1425) Turn lazy operator execution off for streaming programs
Gyula Fora created FLINK-1425: - Summary: Turn lazy operator execution off for streaming programs Key: FLINK-1425 URL: https://issues.apache.org/jira/browse/FLINK-1425 Project: Flink Issue Type: Task Components: Streaming Affects Versions: 0.9 Reporter: Gyula Fora Streaming programs currently use the same lazy operator execution model as batch programs. This makes the functionality of some operators like time based windowing very awkward, since they start computing windows based on the start of the operator. Also, one should expect for streaming programs to run continuously so there is not much to gain from lazy execution. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Master not building and how to notice it faster in the future
Thanks for the nice script. I've just installed it :-) On 21 Jan 2015, at 13:57, Max Michels m...@data-artisans.com wrote: I've created a pre-push hook that does what I described (and a bit more). It does only enforce a check for the remote flink master branch and doesn't disturb you on your pushes. https://gist.github.com/mxm/4d1e26b901c66a682e4a Just put the the file in the .git/hooks/ directory of your repository directory. For example, if you run git push origin, it will check the URL of origin to contain git.apache.org/flink.git. If the remote branch is master, it will run the 'mvn clean package' and then fail to push if maven doesn't exit successfully. Here some examples where I set protected_remote to my remote at ssh://g...@github.com/mxm/flink.git and protected_branch to master: $ git push origin Verifying push to master via 'maven clean package'. ... Commits could not be verified. Executed 'mvn clean package' and it returned 1 error: failed to push some refs to 'ssh://g...@github.com/mxm/flink.git' $ git push origin Verifying push to master via 'maven clean package'. ... Counting objects: 12, done. Delta compression using up to 8 threads. Compressing objects: 100% (8/8), done. Writing objects: 100% (12/12), 838 bytes | 0 bytes/s, done. Total 12 (delta 5), reused 0 (delta 0) To ssh://g...@github.com/mxm/flink.git ce8acc4..d60197b master - master $ git checkout master $ git push origin other:master Please switch to branch other to verify. $ vim README # make some changes $ git push origin Please commit or stash your pending changes. Hope this comes in handy for you. Best regards, Max On Wed, Jan 21, 2015 at 1:46 PM, Max Michels m...@data-artisans.com wrote: @Robert The pre-push hook only resides in your local repository. It cannot be pushed. Thus, we cannot enforce this check but it certainly helps to prevent mistakes. As Ufuk mentioned, you can then even skip the check with the --no-verify option if you're really sure. On Wed, Jan 21, 2015 at 11:45 AM, Ufuk Celebi u...@apache.org wrote: On 21 Jan 2015, at 11:40, Robert Metzger rmetz...@apache.org wrote: Is the git hook something we can control for everybody? I thought its more like a personal thing everybody can set up if wanted? I'm against enforcing something like this for every committer. I don't want to wait for 15 minutes for pushing a typo fix to the documentation. Sorry, I didn't mean to have it in the repo. As long as we notice failing builds fast enough (aka email notification), it should be OK to keep it that way. As you said, every committer should decide this on her own. BTW You can always skip hooks with git push -n (--no-verify).
Turn lazy operator execution off for streaming jobs
Hey Guys, I think it would make sense to turn lazy operator execution off for streaming programs because it would make life simpler for windowing. I also created a JIRA issue here https://issues.apache.org/jira/browse/FLINK-1425. Can anyone give me some quick pointers how to do this? Its probably simple, I am just not familiar with that part of the code. (Or maybe its so easy that someone could pick this up :) ) By the way, do you see any reasons why we should not do this? Thank you! Gyula