Fault Tolerance for Flink Iterations
Hi everybody, I am writing my master thesis about making flink iterations / iterative flink algorithms fault tolerant. The first approach I implemented is a basic checkpointing, where every N iterations the current state is saved into HDFS. To do this I enabled data sinks inside of iterations, then attached a new checkpointing sink to the beginning of each iteration. To recover from a previous checkpoint I cancel all tasks, add a new datasource in front of the iteration and reschedule the tasks with lower dop. I do this out of the JobManager during runtime without starting a new job. The problem is that sometimes the input data to the iteration has some properties like a certain partitioning or sorting, and I am struggeling with reconstructing theses properties from the checkpoint source. I figured that an easier way to do this is to re-optimize the new plan (with the new source as input to the iteration) before the rescheduling. But in the current project structure flink-runtime has no access to flink-optimizer and it would be a major design break to change this. Has somebody any advice on this? best, Markus
Re: Fault Tolerance for Flink Iterations
Hi Markus! I see your point. My first guess would be that it would be simpler to do this logic in the driver program, rather than inside the JobManager. If the checkpoints are all written and the job fails, you check what was the latest completed checkpoint (by file) and then start the program again with the source that refers to those files. That way, you go through the proper stack (optimizer and jobgraph generator) that inserts all the necessary partition and sort operations. Greetings, Stephan On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer holzemer.mar...@googlemail.com wrote: Hi everybody, I am writing my master thesis about making flink iterations / iterative flink algorithms fault tolerant. The first approach I implemented is a basic checkpointing, where every N iterations the current state is saved into HDFS. To do this I enabled data sinks inside of iterations, then attached a new checkpointing sink to the beginning of each iteration. To recover from a previous checkpoint I cancel all tasks, add a new datasource in front of the iteration and reschedule the tasks with lower dop. I do this out of the JobManager during runtime without starting a new job. The problem is that sometimes the input data to the iteration has some properties like a certain partitioning or sorting, and I am struggeling with reconstructing theses properties from the checkpoint source. I figured that an easier way to do this is to re-optimize the new plan (with the new source as input to the iteration) before the rescheduling. But in the current project structure flink-runtime has no access to flink-optimizer and it would be a major design break to change this. Has somebody any advice on this? best, Markus
[jira] [Created] (FLINK-1916) EOFException when running delta-iteration job
Stefan Bunk created FLINK-1916: -- Summary: EOFException when running delta-iteration job Key: FLINK-1916 URL: https://issues.apache.org/jira/browse/FLINK-1916 Project: Flink Issue Type: Bug Environment: 0.9-milestone-1 Exception on the cluster, local execution works Reporter: Stefan Bunk The delta-iteration program in [1] ends with an java.io.EOFException at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeLong(AbstractPagedOutputView.java:291) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeDouble(AbstractPagedOutputView.java:307) at org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:62) at org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:26) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) For logs and the accompanying mailing list discussion see below. When running with slightly different memory configuration, as hinted on the mailing list, I sometimes also get this exception: 19.Apr. 13:39:29 INFO Task - IterationHead(WorksetIteration (Resolved-Redirects)) (10/10) switched to FAILED : java.lang.IndexOutOfBoundsException: Index: 161, Size: 161 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.resetTo(InMemoryPartition.java:352) at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.access$100(InMemoryPartition.java:301) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:226) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) [1] Flink job: https://gist.github.com/knub/0bfec859a563009c1d57 [2] Job manager logs: https://gist.github.com/knub/01e3a4b0edb8cde66ff4 [3] One task manager's logs: https://gist.github.com/knub/8f2f953da95c8d7adefc [4] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-when-running-Flink-job-td1092.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Akka transparency and serialisation
Good point to raise Paris. Here are the practices I (and others) have been using, they work well 1) Do not assume serialization, that is true. If you need to make sure that the instance of the data is not shared after the message, send a manually serialized version. The InstantiationUtil has methods to serialize/deserialize into/from byte arrays. 2) If the data involves user-defined classes, always serialize manually, because the derserialization in the akka stack will not use the required user-code classloader. Have a look at the class SerializedValue, which eagerly serializes and lazily deserialized (with a given class loader) to overcome these situations. 3) I totally agree to not make any assumptions on the behavior of transient fields. Stephan On Tue, Apr 21, 2015 at 1:41 PM, Paris Carbone par...@kth.se wrote: Hello everyone, Many of you are already aware of this but it is good to make it clear in the mailist. We bumped into this special case with Akka several times already and it is important to know where transparency actually breaks. In short, Akka serialises only messages that get transferred over the wire or across JVMs [1]. Thus, we should not rely on messages getting serialised for anything we want to transfer using Akka. To overcome this we should either: 1) Do a deep copy of everything passed via Akka messaging 2) Apply serialisation manually before sending messages and transfer only pre-serialized data. 3) Never rely on transient fields cheers Paris [1] http://doc.akka.io/docs/akka/snapshot/general/remoting.html
Re: Periodic full stream aggregations
Hi Bruno, Of course you can do that as well. (That's the good part :p ) I will open a PR soon with the proposed changes (first without breaking the current Api) and I will post it here. Cheers, Gyula On Tuesday, April 21, 2015, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: -BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Gyula, I have a question regarding your suggestion. Can the current continuous aggregation be also specified with your proposed periodic aggregation? I am thinking about something like dataStream.reduce(...).every(Count.of(1)) Cheers, Bruno On 20.04.2015 22:32, Gyula Fóra wrote: Hey all, I think we are missing a quite useful feature that could be implemented (with some slight modifications) on top of the current windowing api. We currently provide 2 ways of aggregating (or reducing) over streams: doing a continuous aggregation and always output the aggregated value (which cannot be done properly in parallel) or doing aggregation in a window periodically. What we don't have at the moment is periodic aggregations on the whole stream. I would even go as far as to remove the continuous outputting reduce/aggregate it and replace it with this version as this in return can be done properly in parallel. My suggestion would be that a call: dataStream.reduce(..) dataStream.sum(..) would return a windowed data stream where the window is the whole record history, and the user would need to define a trigger to get the actual reduced values like: dataStream.reduce(...).every(Time.of(4,sec)) to get the actual reduced results. dataStream.sum(...).every(...) I think the current data stream reduce/aggregation is very confusing without being practical for any normal use-case. Also this would be a very api breaking change (but I would still make this change as it is much more intuitive than the current behaviour) so I would try to push it before the release if we can agree. Cheers, Gyula - -- ~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~ -BEGIN PGP SIGNATURE- Version: GnuPG v1.4.11 (GNU/Linux) iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA= =8bVQ -END PGP SIGNATURE-
Re: About Operator and OperatorBase
Originally, we had multiple Apis with different data models: the current Java API, the record api, a JSON API. The common API was the data model agnostic set of operators on which they built. It has become redundant when we saw how well things can be built in top of the Java API, using the TypeInformation. Now, Scala, Python, Dataflow, all build on top of the Java API.
Re: Periodic full stream aggregations
Is it possible to switch the order of the statements, i.e., dataStream.every(Time.of(4,sec)).reduce(...) instead of dataStream.reduce(...).every(Time.of(4,sec)) I think that would be more consistent with the structure of the remaining API. Cheers, Fabian 2015-04-21 10:57 GMT+02:00 Gyula Fóra gyf...@apache.org: Hi Bruno, Of course you can do that as well. (That's the good part :p ) I will open a PR soon with the proposed changes (first without breaking the current Api) and I will post it here. Cheers, Gyula On Tuesday, April 21, 2015, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: -BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Gyula, I have a question regarding your suggestion. Can the current continuous aggregation be also specified with your proposed periodic aggregation? I am thinking about something like dataStream.reduce(...).every(Count.of(1)) Cheers, Bruno On 20.04.2015 22:32, Gyula Fóra wrote: Hey all, I think we are missing a quite useful feature that could be implemented (with some slight modifications) on top of the current windowing api. We currently provide 2 ways of aggregating (or reducing) over streams: doing a continuous aggregation and always output the aggregated value (which cannot be done properly in parallel) or doing aggregation in a window periodically. What we don't have at the moment is periodic aggregations on the whole stream. I would even go as far as to remove the continuous outputting reduce/aggregate it and replace it with this version as this in return can be done properly in parallel. My suggestion would be that a call: dataStream.reduce(..) dataStream.sum(..) would return a windowed data stream where the window is the whole record history, and the user would need to define a trigger to get the actual reduced values like: dataStream.reduce(...).every(Time.of(4,sec)) to get the actual reduced results. dataStream.sum(...).every(...) I think the current data stream reduce/aggregation is very confusing without being practical for any normal use-case. Also this would be a very api breaking change (but I would still make this change as it is much more intuitive than the current behaviour) so I would try to push it before the release if we can agree. Cheers, Gyula - -- ~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~ -BEGIN PGP SIGNATURE- Version: GnuPG v1.4.11 (GNU/Linux) iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA= =8bVQ -END PGP SIGNATURE-
[jira] [Created] (FLINK-1919) Add HCatOutputFormat for Tuple data types
Fabian Hueske created FLINK-1919: Summary: Add HCatOutputFormat for Tuple data types Key: FLINK-1919 URL: https://issues.apache.org/jira/browse/FLINK-1919 Project: Flink Issue Type: New Feature Components: Java API, Scala API Reporter: Fabian Hueske Priority: Minor It would be good to have an OutputFormat that can write data to HCatalog tables. The Hadoop `HCatOutputFormat` expects `HCatRecord` objects and writes these to HCatalog tables. We can do the same thing, by creating these `HCatRecord` object with a Map function that precedes a `HadoopOutputFormat` that wraps the Hadoop `HCatOutputFormat`. Better support for Flink Tuples can be added by implementing a custom `HCatOutputFormat` that also depends on the Hadoop `HCatOutputFormat` but internally converts Flink Tuples to `HCatRecords`. This would also include to check if the schema of the HCatalog table and the Flink tuples match. For data types other than tuples, the OutputFormat could either require a preceding Map function that converts to `HCatRecords` or let users specify a MapFunction and invoke that internally. We have already a Flink `HCatInputFormat` which does this in the reverse directions, i.e., it emits Flink Tuples from HCatalog tables. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1917) EOFException when running delta-iteration job
Stefan Bunk created FLINK-1917: -- Summary: EOFException when running delta-iteration job Key: FLINK-1917 URL: https://issues.apache.org/jira/browse/FLINK-1917 Project: Flink Issue Type: Bug Components: Core, Distributed Runtime, Iterations Environment: 0.9-milestone-1 Exception on the cluster, local execution works Reporter: Stefan Bunk The delta-iteration program in [1] ends with an java.io.EOFException at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeLong(AbstractPagedOutputView.java:291) at org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeDouble(AbstractPagedOutputView.java:307) at org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:62) at org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:26) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) For logs and the accompanying mailing list discussion see below. When running with slightly different memory configuration, as hinted on the mailing list, I sometimes also get this exception: 19.Apr. 13:39:29 INFO Task - IterationHead(WorksetIteration (Resolved-Redirects)) (10/10) switched to FAILED : java.lang.IndexOutOfBoundsException: Index: 161, Size: 161 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.resetTo(InMemoryPartition.java:352) at org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.access$100(InMemoryPartition.java:301) at org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:226) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209) at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) [1] Flink job: https://gist.github.com/knub/0bfec859a563009c1d57 [2] Job manager logs: https://gist.github.com/knub/01e3a4b0edb8cde66ff4 [3] One task manager's logs: https://gist.github.com/knub/8f2f953da95c8d7adefc [4] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-when-running-Flink-job-td1092.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: About Operator and OperatorBase
Thanks for the explanation, Stephan. I always wonder why the extra common APIs exist. Then I think this should be high priority if we want to remove the common API to reduce the unnecessary layer and dead code. As Ufuk mentioned before, better doing it now before more stuff build on top of Flink. So removing old Record API [1] and the tests depending on them is step one of the process, but what is JSON API? - Henry [1] https://issues.apache.org/jira/browse/FLINK-1681 On Tue, Apr 21, 2015 at 1:10 AM, Stephan Ewen se...@apache.org wrote: Originally, we had multiple Apis with different data models: the current Java API, the record api, a JSON API. The common API was the data model agnostic set of operators on which they built. It has become redundant when we saw how well things can be built in top of the Java API, using the TypeInformation. Now, Scala, Python, Dataflow, all build on top of the Java API.
Re: Periodic full stream aggregations
Hey, The current code supports 2 types of aggregations, simple binary reduce: T,T=T and also the grouped version for this, where the reduce function is applied per a user defined key (so there we keep a map of reduced values). This can already be used to implement fairly complex logic if we transform the data to a proper type before passing it to the reducer. As a next step we can make this work with fold + combiners as well, where your initial data type is T and your fould function is T,R = R and a combiner is R,R = R. At that point I think any sensible aggregation can be implemented. Regards, Gyula On Tue, Apr 21, 2015 at 10:50 PM, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: -BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Gyula, fair enough! I used a bad example. What I really wanted to know is whether your code supports only aggregation like sum, min, and max where you need to pass only a value to the next aggregation or also more complex data structures, e.g., a synopsis of the full stream, to compute an aggregation such as an approximate count distinct (item count)? Cheers, Bruno On 21.04.2015 15:18, Gyula Fóra wrote: You are right, but you should never try to compute full stream median, thats the point :D On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: Hi Gyula, I read your comments of your PR. I have a question to this comment: It only allows aggregations so we dont need to keep the full history in a buffer. What if the user implements an aggregation function like a median? For a median you need the full history, don't you? Am I missing something? Cheers, Bruno On 21.04.2015 14:31, Gyula Fóra wrote: I have opened a PR for this feature: https://github.com/apache/flink/pull/614 Cheers, Gyula On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra gyula.f...@gmail.com wrote: Thats a good idea, I will modify my PR to that :) Gyula On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske fhue...@gmail.com wrote: Is it possible to switch the order of the statements, i.e., dataStream.every(Time.of(4,sec)).reduce(...) instead of dataStream.reduce(...).every(Time.of(4,sec)) I think that would be more consistent with the structure of the remaining API. Cheers, Fabian 2015-04-21 10:57 GMT+02:00 Gyula Fóra gyf...@apache.org: Hi Bruno, Of course you can do that as well. (That's the good part :p ) I will open a PR soon with the proposed changes (first without breaking the current Api) and I will post it here. Cheers, Gyula On Tuesday, April 21, 2015, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: Hi Gyula, I have a question regarding your suggestion. Can the current continuous aggregation be also specified with your proposed periodic aggregation? I am thinking about something like dataStream.reduce(...).every(Count.of(1)) Cheers, Bruno On 20.04.2015 22:32, Gyula Fóra wrote: Hey all, I think we are missing a quite useful feature that could be implemented (with some slight modifications) on top of the current windowing api. We currently provide 2 ways of aggregating (or reducing) over streams: doing a continuous aggregation and always output the aggregated value (which cannot be done properly in parallel) or doing aggregation in a window periodically. What we don't have at the moment is periodic aggregations on the whole stream. I would even go as far as to remove the continuous outputting reduce/aggregate it and replace it with this version as this in return can be done properly in parallel. My suggestion would be that a call: dataStream.reduce(..) dataStream.sum(..) would return a windowed data stream where the window is the whole record history, and the user would need to define a trigger to get the actual reduced values like: dataStream.reduce(...).every(Time.of(4,sec)) to get the actual reduced results. dataStream.sum(...).every(...) I think the current data stream reduce/aggregation is very confusing without being practical for any normal use-case. Also this would be a very api breaking change (but I would still make this change as it is much more intuitive than the current behaviour) so I would try to push it before the release if we can agree. Cheers, Gyula - -- ~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~ -BEGIN PGP SIGNATURE- Version: GnuPG v1 iQEcBAEBAgAGBQJVNrgwAAoJEKdCIJx7flKwBbUIALXaXY3WuQw5ZG/TPrUZLl7d jLI0syhM62rv8larlpC6xGLxIHDDLfABSD/F+amXE6afmYqM4cb2R9tsjWuRzKt8
Re: Periodic full stream aggregations
-BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Gyula, fair enough! I used a bad example. What I really wanted to know is whether your code supports only aggregation like sum, min, and max where you need to pass only a value to the next aggregation or also more complex data structures, e.g., a synopsis of the full stream, to compute an aggregation such as an approximate count distinct (item count)? Cheers, Bruno On 21.04.2015 15:18, Gyula Fóra wrote: You are right, but you should never try to compute full stream median, thats the point :D On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: Hi Gyula, I read your comments of your PR. I have a question to this comment: It only allows aggregations so we dont need to keep the full history in a buffer. What if the user implements an aggregation function like a median? For a median you need the full history, don't you? Am I missing something? Cheers, Bruno On 21.04.2015 14:31, Gyula Fóra wrote: I have opened a PR for this feature: https://github.com/apache/flink/pull/614 Cheers, Gyula On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra gyula.f...@gmail.com wrote: Thats a good idea, I will modify my PR to that :) Gyula On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske fhue...@gmail.com wrote: Is it possible to switch the order of the statements, i.e., dataStream.every(Time.of(4,sec)).reduce(...) instead of dataStream.reduce(...).every(Time.of(4,sec)) I think that would be more consistent with the structure of the remaining API. Cheers, Fabian 2015-04-21 10:57 GMT+02:00 Gyula Fóra gyf...@apache.org: Hi Bruno, Of course you can do that as well. (That's the good part :p ) I will open a PR soon with the proposed changes (first without breaking the current Api) and I will post it here. Cheers, Gyula On Tuesday, April 21, 2015, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: Hi Gyula, I have a question regarding your suggestion. Can the current continuous aggregation be also specified with your proposed periodic aggregation? I am thinking about something like dataStream.reduce(...).every(Count.of(1)) Cheers, Bruno On 20.04.2015 22:32, Gyula Fóra wrote: Hey all, I think we are missing a quite useful feature that could be implemented (with some slight modifications) on top of the current windowing api. We currently provide 2 ways of aggregating (or reducing) over streams: doing a continuous aggregation and always output the aggregated value (which cannot be done properly in parallel) or doing aggregation in a window periodically. What we don't have at the moment is periodic aggregations on the whole stream. I would even go as far as to remove the continuous outputting reduce/aggregate it and replace it with this version as this in return can be done properly in parallel. My suggestion would be that a call: dataStream.reduce(..) dataStream.sum(..) would return a windowed data stream where the window is the whole record history, and the user would need to define a trigger to get the actual reduced values like: dataStream.reduce(...).every(Time.of(4,sec)) to get the actual reduced results. dataStream.sum(...).every(...) I think the current data stream reduce/aggregation is very confusing without being practical for any normal use-case. Also this would be a very api breaking change (but I would still make this change as it is much more intuitive than the current behaviour) so I would try to push it before the release if we can agree. Cheers, Gyula - -- ~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~ -BEGIN PGP SIGNATURE- Version: GnuPG v1 iQEcBAEBAgAGBQJVNrgwAAoJEKdCIJx7flKwBbUIALXaXY3WuQw5ZG/TPrUZLl7d jLI0syhM62rv8larlpC6xGLxIHDDLfABSD/F+amXE6afmYqM4cb2R9tsjWuRzKt8 IWJoqT17EetTw82brOfy+kLCdm+URbPa1IzbuGeg02/zx/DmWXavnBilwSr679mC kbaGPgQ/6mVN6p4GL873CXhep4R89YQVmIG+9pQaesvh//lqTkV/8eXjP2jKN4Oq gYnWIwScJ9QfsyRj3jRs7lVLXeIq5ID94UkLryZnn5dEIRnoxfq6bHR0pVUbQJgp jwZRtT5CX83U3KUvstZ0z6M6ButbCWq8ol2Gf6ZOVpZfzj68Fz1PtbZyJTFhpDU= =bhGt -END PGP SIGNATURE-
Re: Periodic full stream aggregations
Thats a good idea, I will modify my PR to that :) Gyula On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske fhue...@gmail.com wrote: Is it possible to switch the order of the statements, i.e., dataStream.every(Time.of(4,sec)).reduce(...) instead of dataStream.reduce(...).every(Time.of(4,sec)) I think that would be more consistent with the structure of the remaining API. Cheers, Fabian 2015-04-21 10:57 GMT+02:00 Gyula Fóra gyf...@apache.org: Hi Bruno, Of course you can do that as well. (That's the good part :p ) I will open a PR soon with the proposed changes (first without breaking the current Api) and I will post it here. Cheers, Gyula On Tuesday, April 21, 2015, Bruno Cadonna cado...@informatik.hu-berlin.de wrote: -BEGIN PGP SIGNED MESSAGE- Hash: SHA1 Hi Gyula, I have a question regarding your suggestion. Can the current continuous aggregation be also specified with your proposed periodic aggregation? I am thinking about something like dataStream.reduce(...).every(Count.of(1)) Cheers, Bruno On 20.04.2015 22:32, Gyula Fóra wrote: Hey all, I think we are missing a quite useful feature that could be implemented (with some slight modifications) on top of the current windowing api. We currently provide 2 ways of aggregating (or reducing) over streams: doing a continuous aggregation and always output the aggregated value (which cannot be done properly in parallel) or doing aggregation in a window periodically. What we don't have at the moment is periodic aggregations on the whole stream. I would even go as far as to remove the continuous outputting reduce/aggregate it and replace it with this version as this in return can be done properly in parallel. My suggestion would be that a call: dataStream.reduce(..) dataStream.sum(..) would return a windowed data stream where the window is the whole record history, and the user would need to define a trigger to get the actual reduced values like: dataStream.reduce(...).every(Time.of(4,sec)) to get the actual reduced results. dataStream.sum(...).every(...) I think the current data stream reduce/aggregation is very confusing without being practical for any normal use-case. Also this would be a very api breaking change (but I would still make this change as it is much more intuitive than the current behaviour) so I would try to push it before the release if we can agree. Cheers, Gyula - -- ~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~ -BEGIN PGP SIGNATURE- Version: GnuPG v1.4.11 (GNU/Linux) iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA= =8bVQ -END PGP SIGNATURE-
Akka transparency and serialisation
Hello everyone, Many of you are already aware of this but it is good to make it clear in the mailist. We bumped into this special case with Akka several times already and it is important to know where transparency actually breaks. In short, Akka serialises only messages that get transferred over the wire or across JVMs [1]. Thus, we should not rely on messages getting serialised for anything we want to transfer using Akka. To overcome this we should either: 1) Do a deep copy of everything passed via Akka messaging 2) Apply serialisation manually before sending messages and transfer only pre-serialized data. 3) Never rely on transient fields cheers Paris [1] http://doc.akka.io/docs/akka/snapshot/general/remoting.html
[jira] [Created] (FLINK-1918) NullPointerException at org.apache.flink.client.program.Client's constructor while using ExecutionEnvironment.createRemoteEnvironment
Zoltán Zvara created FLINK-1918: --- Summary: NullPointerException at org.apache.flink.client.program.Client's constructor while using ExecutionEnvironment.createRemoteEnvironment Key: FLINK-1918 URL: https://issues.apache.org/jira/browse/FLINK-1918 Project: Flink Issue Type: Bug Components: YARN Client Reporter: Zoltán Zvara Trace: {code} Exception in thread main java.lang.NullPointerException at org.apache.flink.client.program.Client.init(Client.java:104) at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:86) at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:70) at Wordcount.main(Wordcount.java:23) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) {code} The constructor is trying to set configuration parameter {{jobmanager.rpc.address}} with {{jobManagerAddress.getAddress().getHostAddress()}}, but {{jobManagerAddress.holder.addr}} is {{null}}. {{jobManagerAddress.holder.hostname}} and {{port}} holds the valid information. -- This message was sent by Atlassian JIRA (v6.3.4#6332)