Fault Tolerance for Flink Iterations

2015-04-21 Thread Markus Holzemer
Hi everybody,

I am writing my master thesis about making flink iterations / iterative
flink algorithms fault tolerant.
The first approach I implemented is a basic checkpointing, where every N
iterations the current state is saved into HDFS.
To do this I enabled data sinks inside of iterations, then attached a new
checkpointing sink to the beginning of each iteration. To recover from a
previous checkpoint I cancel all tasks, add a new datasource in front of
the iteration and reschedule the tasks with lower dop. I do this out of the
JobManager during runtime without starting a new job.
The problem is that sometimes the input data to the iteration has some
properties like a certain partitioning or sorting, and I am struggeling
with reconstructing theses properties from the checkpoint source.
I figured that an easier way to do this is to re-optimize the new plan
(with the new source as input to the iteration) before the rescheduling.
But in the current project structure flink-runtime has no access to
flink-optimizer and it would be a major design break to change this.
Has somebody any advice on this?

best,
Markus


Re: Fault Tolerance for Flink Iterations

2015-04-21 Thread Stephan Ewen
Hi Markus!

I see your point. My first guess would be that it would be simpler to do
this logic in the driver program, rather
than inside the JobManager. If the checkpoints are all written and the job
fails, you check what was the latest completed
checkpoint (by file) and then start the program again with the source that
refers to those files.

That way, you go through the proper stack (optimizer and jobgraph
generator) that inserts all the necessary partition and
sort operations.

Greetings,
Stephan



On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer 
holzemer.mar...@googlemail.com wrote:

 Hi everybody,

 I am writing my master thesis about making flink iterations / iterative
 flink algorithms fault tolerant.
 The first approach I implemented is a basic checkpointing, where every N
 iterations the current state is saved into HDFS.
 To do this I enabled data sinks inside of iterations, then attached a new
 checkpointing sink to the beginning of each iteration. To recover from a
 previous checkpoint I cancel all tasks, add a new datasource in front of
 the iteration and reschedule the tasks with lower dop. I do this out of the
 JobManager during runtime without starting a new job.
 The problem is that sometimes the input data to the iteration has some
 properties like a certain partitioning or sorting, and I am struggeling
 with reconstructing theses properties from the checkpoint source.
 I figured that an easier way to do this is to re-optimize the new plan
 (with the new source as input to the iteration) before the rescheduling.
 But in the current project structure flink-runtime has no access to
 flink-optimizer and it would be a major design break to change this.
 Has somebody any advice on this?

 best,
 Markus



[jira] [Created] (FLINK-1916) EOFException when running delta-iteration job

2015-04-21 Thread Stefan Bunk (JIRA)
Stefan Bunk created FLINK-1916:
--

 Summary: EOFException when running delta-iteration job
 Key: FLINK-1916
 URL: https://issues.apache.org/jira/browse/FLINK-1916
 Project: Flink
  Issue Type: Bug
 Environment: 0.9-milestone-1
Exception on the cluster, local execution works
Reporter: Stefan Bunk


The delta-iteration program in [1] ends with an

java.io.EOFException
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeLong(AbstractPagedOutputView.java:291)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeDouble(AbstractPagedOutputView.java:307)
at 
org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:62)
at 
org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:26)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
at java.lang.Thread.run(Thread.java:745)

For logs and the accompanying mailing list discussion see below.

When running with slightly different memory configuration, as hinted on the 
mailing list, I sometimes also get this exception:

19.Apr. 13:39:29 INFO  Task - IterationHead(WorksetIteration 
(Resolved-Redirects)) (10/10) switched to FAILED : 
java.lang.IndexOutOfBoundsException: Index: 161, Size: 161
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.resetTo(InMemoryPartition.java:352)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.access$100(InMemoryPartition.java:301)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:226)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
at java.lang.Thread.run(Thread.java:745)

[1] Flink job: https://gist.github.com/knub/0bfec859a563009c1d57
[2] Job manager logs: https://gist.github.com/knub/01e3a4b0edb8cde66ff4
[3] One task manager's logs: https://gist.github.com/knub/8f2f953da95c8d7adefc
[4] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-when-running-Flink-job-td1092.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Akka transparency and serialisation

2015-04-21 Thread Stephan Ewen
Good point to raise Paris.

Here are the practices I (and others) have been using, they work well

1) Do not assume serialization, that is true. If you need to make sure that
the instance of the data is not shared after the message, send a manually
serialized version. The InstantiationUtil has methods to
serialize/deserialize into/from byte arrays.

2) If the data involves user-defined classes, always serialize manually,
because the derserialization in the akka stack will not use the required
user-code classloader. Have a look at the class SerializedValue, which
eagerly serializes and lazily deserialized (with a given class loader) to
overcome these situations.

3) I totally agree to not make any assumptions on the behavior of transient
fields.

Stephan


On Tue, Apr 21, 2015 at 1:41 PM, Paris Carbone par...@kth.se wrote:

 Hello everyone,


 Many of you are already aware of this but it is good to make it clear in
 the mailist. We bumped into this special case with Akka several times
 already and it is important to know where transparency actually breaks.


 In short, Akka serialises only messages that get transferred over the wire
 or across JVMs [1]. Thus, we should not rely on messages getting serialised
 for anything we want to transfer using Akka. To overcome this we should
 either:


 1) Do a deep copy of everything passed via Akka messaging

 2) Apply serialisation manually before sending messages and transfer only
 pre-serialized data.

 3) Never rely on transient fields


 cheers

 Paris


 [1] http://doc.akka.io/docs/akka/snapshot/general/remoting.html




Re: Periodic full stream aggregations

2015-04-21 Thread Gyula Fóra
Hi Bruno,

Of course you can do that as well. (That's the good part :p )

I will open a PR soon with the proposed changes (first without breaking the
current Api) and I will post it here.

Cheers,
Gyula

On Tuesday, April 21, 2015, Bruno Cadonna cado...@informatik.hu-berlin.de
wrote:

 -BEGIN PGP SIGNED MESSAGE-
 Hash: SHA1

 Hi Gyula,

 I have a question regarding your suggestion.

 Can the current continuous aggregation be also specified with your
 proposed periodic aggregation?

 I am thinking about something like

 dataStream.reduce(...).every(Count.of(1))

 Cheers,
 Bruno

 On 20.04.2015 22:32, Gyula Fóra wrote:
  Hey all,
 
  I think we are missing a quite useful feature that could be
  implemented (with some slight modifications) on top of the current
  windowing api.
 
  We currently provide 2 ways of aggregating (or reducing) over
  streams: doing a continuous aggregation and always output the
  aggregated value (which cannot be done properly in parallel) or
  doing aggregation in a window periodically.
 
  What we don't have at the moment is periodic aggregations on the
  whole stream. I would even go as far as to remove the continuous
  outputting reduce/aggregate it and replace it with this version as
  this in return can be done properly in parallel.
 
  My suggestion would be that a call:
 
  dataStream.reduce(..) dataStream.sum(..)
 
  would return a windowed data stream where the window is the whole
  record history, and the user would need to define a trigger to get
  the actual reduced values like:
 
  dataStream.reduce(...).every(Time.of(4,sec)) to get the actual
  reduced results. dataStream.sum(...).every(...)
 
  I think the current data stream reduce/aggregation is very
  confusing without being practical for any normal use-case.
 
  Also this would be a very api breaking change (but I would still
  make this change as it is much more intuitive than the current
  behaviour) so I would try to push it before the release if we can
  agree.
 
  Cheers, Gyula
 

 - --
 ~~~

   Dr. Bruno Cadonna
   Postdoctoral Researcher

   Databases and Information Systems
   Department of Computer Science
   Humboldt-Universität zu Berlin

   http://www.informatik.hu-berlin.de/~cadonnab

 ~~~
 -BEGIN PGP SIGNATURE-
 Version: GnuPG v1.4.11 (GNU/Linux)

 iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr
 PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB
 g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n
 R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L
 OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p
 gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA=
 =8bVQ
 -END PGP SIGNATURE-



Re: About Operator and OperatorBase

2015-04-21 Thread Stephan Ewen
Originally, we had multiple Apis with different data models: the current
Java API, the record api, a JSON API. The common API was the data model
agnostic set of operators on which they built.

It has become redundant when we saw how well things can be built in top of
the Java API, using the TypeInformation. Now, Scala, Python, Dataflow, all
build on top of the Java API.


Re: Periodic full stream aggregations

2015-04-21 Thread Fabian Hueske
Is it possible to switch the order of the statements, i.e.,

dataStream.every(Time.of(4,sec)).reduce(...) instead of
dataStream.reduce(...).every(Time.of(4,sec))

I think that would be more consistent with the structure of the remaining
API.

Cheers, Fabian

2015-04-21 10:57 GMT+02:00 Gyula Fóra gyf...@apache.org:

 Hi Bruno,

 Of course you can do that as well. (That's the good part :p )

 I will open a PR soon with the proposed changes (first without breaking the
 current Api) and I will post it here.

 Cheers,
 Gyula

 On Tuesday, April 21, 2015, Bruno Cadonna cado...@informatik.hu-berlin.de
 
 wrote:

  -BEGIN PGP SIGNED MESSAGE-
  Hash: SHA1
 
  Hi Gyula,
 
  I have a question regarding your suggestion.
 
  Can the current continuous aggregation be also specified with your
  proposed periodic aggregation?
 
  I am thinking about something like
 
  dataStream.reduce(...).every(Count.of(1))
 
  Cheers,
  Bruno
 
  On 20.04.2015 22:32, Gyula Fóra wrote:
   Hey all,
  
   I think we are missing a quite useful feature that could be
   implemented (with some slight modifications) on top of the current
   windowing api.
  
   We currently provide 2 ways of aggregating (or reducing) over
   streams: doing a continuous aggregation and always output the
   aggregated value (which cannot be done properly in parallel) or
   doing aggregation in a window periodically.
  
   What we don't have at the moment is periodic aggregations on the
   whole stream. I would even go as far as to remove the continuous
   outputting reduce/aggregate it and replace it with this version as
   this in return can be done properly in parallel.
  
   My suggestion would be that a call:
  
   dataStream.reduce(..) dataStream.sum(..)
  
   would return a windowed data stream where the window is the whole
   record history, and the user would need to define a trigger to get
   the actual reduced values like:
  
   dataStream.reduce(...).every(Time.of(4,sec)) to get the actual
   reduced results. dataStream.sum(...).every(...)
  
   I think the current data stream reduce/aggregation is very
   confusing without being practical for any normal use-case.
  
   Also this would be a very api breaking change (but I would still
   make this change as it is much more intuitive than the current
   behaviour) so I would try to push it before the release if we can
   agree.
  
   Cheers, Gyula
  
 
  - --
  ~~~
 
Dr. Bruno Cadonna
Postdoctoral Researcher
 
Databases and Information Systems
Department of Computer Science
Humboldt-Universität zu Berlin
 
http://www.informatik.hu-berlin.de/~cadonnab
 
  ~~~
  -BEGIN PGP SIGNATURE-
  Version: GnuPG v1.4.11 (GNU/Linux)
 
  iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr
  PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB
  g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n
  R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L
  OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p
  gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA=
  =8bVQ
  -END PGP SIGNATURE-
 



[jira] [Created] (FLINK-1919) Add HCatOutputFormat for Tuple data types

2015-04-21 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-1919:


 Summary: Add HCatOutputFormat for Tuple data types
 Key: FLINK-1919
 URL: https://issues.apache.org/jira/browse/FLINK-1919
 Project: Flink
  Issue Type: New Feature
  Components: Java API, Scala API
Reporter: Fabian Hueske
Priority: Minor


It would be good to have an OutputFormat that can write data to HCatalog tables.

The Hadoop `HCatOutputFormat` expects `HCatRecord` objects and writes these to 
HCatalog tables. We can do the same thing, by creating these `HCatRecord` 
object with a Map function that precedes a `HadoopOutputFormat` that wraps the 
Hadoop `HCatOutputFormat`.

Better support for Flink Tuples can be added by implementing a custom 
`HCatOutputFormat` that also depends on the Hadoop `HCatOutputFormat` but 
internally converts Flink Tuples to `HCatRecords`. This would also include to 
check if the schema of the HCatalog table and the Flink tuples match. For data 
types other than tuples, the OutputFormat could either require a preceding Map 
function that converts to `HCatRecords` or let users specify a MapFunction and 
invoke that internally.

We have already a Flink `HCatInputFormat` which does this in the reverse 
directions, i.e., it emits Flink Tuples from HCatalog tables.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1917) EOFException when running delta-iteration job

2015-04-21 Thread Stefan Bunk (JIRA)
Stefan Bunk created FLINK-1917:
--

 Summary: EOFException when running delta-iteration job
 Key: FLINK-1917
 URL: https://issues.apache.org/jira/browse/FLINK-1917
 Project: Flink
  Issue Type: Bug
  Components: Core, Distributed Runtime, Iterations
 Environment: 0.9-milestone-1
Exception on the cluster, local execution works
Reporter: Stefan Bunk


The delta-iteration program in [1] ends with an

java.io.EOFException
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeLong(AbstractPagedOutputView.java:291)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeDouble(AbstractPagedOutputView.java:307)
at 
org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:62)
at 
org.apache.flink.api.common.typeutils.base.DoubleSerializer.serialize(DoubleSerializer.java:26)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
at java.lang.Thread.run(Thread.java:745)

For logs and the accompanying mailing list discussion see below.

When running with slightly different memory configuration, as hinted on the 
mailing list, I sometimes also get this exception:

19.Apr. 13:39:29 INFO  Task - IterationHead(WorksetIteration 
(Resolved-Redirects)) (10/10) switched to FAILED : 
java.lang.IndexOutOfBoundsException: Index: 161, Size: 161
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.resetTo(InMemoryPartition.java:352)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.access$100(InMemoryPartition.java:301)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:226)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
at java.lang.Thread.run(Thread.java:745)

[1] Flink job: https://gist.github.com/knub/0bfec859a563009c1d57
[2] Job manager logs: https://gist.github.com/knub/01e3a4b0edb8cde66ff4
[3] One task manager's logs: https://gist.github.com/knub/8f2f953da95c8d7adefc
[4] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/EOFException-when-running-Flink-job-td1092.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: About Operator and OperatorBase

2015-04-21 Thread Henry Saputra
Thanks for the explanation, Stephan. I always wonder why the extra
common APIs exist.

Then I think this should be high priority if we want to remove the
common API to reduce the unnecessary layer and dead code. As Ufuk
mentioned before, better doing it now before more stuff build on top
of Flink.

So removing old Record API [1] and the tests depending on them is step
one of the process, but what is JSON API?

- Henry

[1] https://issues.apache.org/jira/browse/FLINK-1681

On Tue, Apr 21, 2015 at 1:10 AM, Stephan Ewen se...@apache.org wrote:
 Originally, we had multiple Apis with different data models: the current
 Java API, the record api, a JSON API. The common API was the data model
 agnostic set of operators on which they built.

 It has become redundant when we saw how well things can be built in top of
 the Java API, using the TypeInformation. Now, Scala, Python, Dataflow, all
 build on top of the Java API.


Re: Periodic full stream aggregations

2015-04-21 Thread Gyula Fóra
Hey,

The current code supports 2 types of aggregations, simple binary reduce:
T,T=T and also the grouped version for this, where the reduce function is
applied per a user defined key (so there we keep a map of reduced values).
This can already be used to implement fairly complex logic if we transform
the data to a proper type before passing it to the reducer.

As a next step we can make this work with fold + combiners as well, where
your initial data type is T and your fould function is T,R = R and a
combiner is R,R = R.

At that point I think any sensible aggregation can be implemented.

Regards,
Gyula


On Tue, Apr 21, 2015 at 10:50 PM, Bruno Cadonna 
cado...@informatik.hu-berlin.de wrote:

 -BEGIN PGP SIGNED MESSAGE-
 Hash: SHA1

 Hi Gyula,

 fair enough!

 I used a bad example.

 What I really wanted to know is whether your code supports only
 aggregation like sum, min, and max where you need to pass only a value
 to the next aggregation or also more complex data structures, e.g., a
 synopsis of the full stream, to compute an aggregation such as an
 approximate count distinct (item count)?

 Cheers,
 Bruno

 On 21.04.2015 15:18, Gyula Fóra wrote:
  You are right, but you should never try to compute full stream
  median, thats the point :D
 
  On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna 
  cado...@informatik.hu-berlin.de wrote:
 
  Hi Gyula,
 
  I read your comments of your PR.
 
  I have a question to this comment:
 
  It only allows aggregations so we dont need to keep the full
  history in a buffer.
 
  What if the user implements an aggregation function like a median?
 
  For a median you need the full history, don't you?
 
  Am I missing something?
 
  Cheers, Bruno
 
  On 21.04.2015 14:31, Gyula Fóra wrote:
  I have opened a PR for this feature:
 
  https://github.com/apache/flink/pull/614
 
  Cheers, Gyula
 
  On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra
  gyula.f...@gmail.com wrote:
 
  Thats a good idea, I will modify my PR to that :)
 
  Gyula
 
  On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske
  fhue...@gmail.com wrote:
 
  Is it possible to switch the order of the statements,
  i.e.,
 
  dataStream.every(Time.of(4,sec)).reduce(...) instead of
  dataStream.reduce(...).every(Time.of(4,sec))
 
  I think that would be more consistent with the structure
  of the remaining API.
 
  Cheers, Fabian
 
  2015-04-21 10:57 GMT+02:00 Gyula Fóra
  gyf...@apache.org:
 
  Hi Bruno,
 
  Of course you can do that as well. (That's the good
  part :p )
 
  I will open a PR soon with the proposed changes (first
  without breaking
  the
  current Api) and I will post it here.
 
  Cheers, Gyula
 
  On Tuesday, April 21, 2015, Bruno Cadonna 
  cado...@informatik.hu-berlin.de
 
  wrote:
 
  Hi Gyula,
 
  I have a question regarding your suggestion.
 
  Can the current continuous aggregation be also specified with
  your proposed periodic aggregation?
 
  I am thinking about something like
 
  dataStream.reduce(...).every(Count.of(1))
 
  Cheers, Bruno
 
  On 20.04.2015 22:32, Gyula Fóra wrote:
  Hey all,
 
  I think we are missing a quite useful feature
  that could be implemented (with some slight
  modifications) on top of the current windowing
  api.
 
  We currently provide 2 ways of aggregating (or
  reducing) over streams: doing a continuous
  aggregation and always output the aggregated
  value (which cannot be done properly in parallel)
  or doing aggregation in a window periodically.
 
  What we don't have at the moment is periodic
  aggregations on the whole stream. I would even go
  as far as to remove the continuous outputting
  reduce/aggregate it and replace it with this
  version as this in return can be done properly in
  parallel.
 
  My suggestion would be that a call:
 
  dataStream.reduce(..) dataStream.sum(..)
 
  would return a windowed data stream where the
  window is the whole record history, and the user
  would need to define a trigger to get the actual
  reduced values like:
 
  dataStream.reduce(...).every(Time.of(4,sec)) to
  get the actual reduced results.
  dataStream.sum(...).every(...)
 
  I think the current data stream
  reduce/aggregation is very confusing without
  being practical for any normal use-case.
 
  Also this would be a very api breaking change
  (but I would still make this change as it is much
  more intuitive than the current behaviour) so I
  would try to push it before the release if we can
  agree.
 
  Cheers, Gyula
 
 
 
 
 
 
 
 
 
 
 

 - --
 ~~~

   Dr. Bruno Cadonna
   Postdoctoral Researcher

   Databases and Information Systems
   Department of Computer Science
   Humboldt-Universität zu Berlin

   http://www.informatik.hu-berlin.de/~cadonnab

 ~~~
 -BEGIN PGP SIGNATURE-
 Version: GnuPG v1

 iQEcBAEBAgAGBQJVNrgwAAoJEKdCIJx7flKwBbUIALXaXY3WuQw5ZG/TPrUZLl7d
 jLI0syhM62rv8larlpC6xGLxIHDDLfABSD/F+amXE6afmYqM4cb2R9tsjWuRzKt8
 

Re: Periodic full stream aggregations

2015-04-21 Thread Bruno Cadonna
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA1

Hi Gyula,

fair enough!

I used a bad example.

What I really wanted to know is whether your code supports only
aggregation like sum, min, and max where you need to pass only a value
to the next aggregation or also more complex data structures, e.g., a
synopsis of the full stream, to compute an aggregation such as an
approximate count distinct (item count)?

Cheers,
Bruno

On 21.04.2015 15:18, Gyula Fóra wrote:
 You are right, but you should never try to compute full stream
 median, thats the point :D
 
 On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna  
 cado...@informatik.hu-berlin.de wrote:
 
 Hi Gyula,
 
 I read your comments of your PR.
 
 I have a question to this comment:
 
 It only allows aggregations so we dont need to keep the full
 history in a buffer.
 
 What if the user implements an aggregation function like a median?
 
 For a median you need the full history, don't you?
 
 Am I missing something?
 
 Cheers, Bruno
 
 On 21.04.2015 14:31, Gyula Fóra wrote:
 I have opened a PR for this feature:
 
 https://github.com/apache/flink/pull/614
 
 Cheers, Gyula
 
 On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra
 gyula.f...@gmail.com wrote:
 
 Thats a good idea, I will modify my PR to that :)
 
 Gyula
 
 On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske 
 fhue...@gmail.com wrote:
 
 Is it possible to switch the order of the statements,
 i.e.,
 
 dataStream.every(Time.of(4,sec)).reduce(...) instead of 
 dataStream.reduce(...).every(Time.of(4,sec))
 
 I think that would be more consistent with the structure
 of the remaining API.
 
 Cheers, Fabian
 
 2015-04-21 10:57 GMT+02:00 Gyula Fóra
 gyf...@apache.org:
 
 Hi Bruno,
 
 Of course you can do that as well. (That's the good
 part :p )
 
 I will open a PR soon with the proposed changes (first 
 without breaking
 the
 current Api) and I will post it here.
 
 Cheers, Gyula
 
 On Tuesday, April 21, 2015, Bruno Cadonna 
 cado...@informatik.hu-berlin.de
 
 wrote:
 
 Hi Gyula,
 
 I have a question regarding your suggestion.
 
 Can the current continuous aggregation be also specified with
 your proposed periodic aggregation?
 
 I am thinking about something like
 
 dataStream.reduce(...).every(Count.of(1))
 
 Cheers, Bruno
 
 On 20.04.2015 22:32, Gyula Fóra wrote:
 Hey all,
 
 I think we are missing a quite useful feature
 that could be implemented (with some slight
 modifications) on top of the current windowing
 api.
 
 We currently provide 2 ways of aggregating (or 
 reducing) over streams: doing a continuous
 aggregation and always output the aggregated
 value (which cannot be done properly in parallel)
 or doing aggregation in a window periodically.
 
 What we don't have at the moment is periodic 
 aggregations on the whole stream. I would even go
 as far as to remove the continuous outputting 
 reduce/aggregate it and replace it with this
 version as this in return can be done properly in
 parallel.
 
 My suggestion would be that a call:
 
 dataStream.reduce(..) dataStream.sum(..)
 
 would return a windowed data stream where the
 window is the whole record history, and the user
 would need to define a trigger to get the actual
 reduced values like:
 
 dataStream.reduce(...).every(Time.of(4,sec)) to
 get the actual reduced results.
 dataStream.sum(...).every(...)
 
 I think the current data stream
 reduce/aggregation is very confusing without
 being practical for any normal use-case.
 
 Also this would be a very api breaking change
 (but I would still make this change as it is much
 more intuitive than the current behaviour) so I
 would try to push it before the release if we can
 agree.
 
 Cheers, Gyula
 
 
 
 
 
 
 
 
 
 
 

- -- 
~~~

  Dr. Bruno Cadonna
  Postdoctoral Researcher

  Databases and Information Systems
  Department of Computer Science
  Humboldt-Universität zu Berlin

  http://www.informatik.hu-berlin.de/~cadonnab

~~~
-BEGIN PGP SIGNATURE-
Version: GnuPG v1

iQEcBAEBAgAGBQJVNrgwAAoJEKdCIJx7flKwBbUIALXaXY3WuQw5ZG/TPrUZLl7d
jLI0syhM62rv8larlpC6xGLxIHDDLfABSD/F+amXE6afmYqM4cb2R9tsjWuRzKt8
IWJoqT17EetTw82brOfy+kLCdm+URbPa1IzbuGeg02/zx/DmWXavnBilwSr679mC
kbaGPgQ/6mVN6p4GL873CXhep4R89YQVmIG+9pQaesvh//lqTkV/8eXjP2jKN4Oq
gYnWIwScJ9QfsyRj3jRs7lVLXeIq5ID94UkLryZnn5dEIRnoxfq6bHR0pVUbQJgp
jwZRtT5CX83U3KUvstZ0z6M6ButbCWq8ol2Gf6ZOVpZfzj68Fz1PtbZyJTFhpDU=
=bhGt
-END PGP SIGNATURE-


Re: Periodic full stream aggregations

2015-04-21 Thread Gyula Fóra
Thats a good idea, I will modify my PR to that :)

Gyula

On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske fhue...@gmail.com wrote:

 Is it possible to switch the order of the statements, i.e.,

 dataStream.every(Time.of(4,sec)).reduce(...) instead of
 dataStream.reduce(...).every(Time.of(4,sec))

 I think that would be more consistent with the structure of the remaining
 API.

 Cheers, Fabian

 2015-04-21 10:57 GMT+02:00 Gyula Fóra gyf...@apache.org:

  Hi Bruno,
 
  Of course you can do that as well. (That's the good part :p )
 
  I will open a PR soon with the proposed changes (first without breaking
 the
  current Api) and I will post it here.
 
  Cheers,
  Gyula
 
  On Tuesday, April 21, 2015, Bruno Cadonna 
 cado...@informatik.hu-berlin.de
  
  wrote:
 
   -BEGIN PGP SIGNED MESSAGE-
   Hash: SHA1
  
   Hi Gyula,
  
   I have a question regarding your suggestion.
  
   Can the current continuous aggregation be also specified with your
   proposed periodic aggregation?
  
   I am thinking about something like
  
   dataStream.reduce(...).every(Count.of(1))
  
   Cheers,
   Bruno
  
   On 20.04.2015 22:32, Gyula Fóra wrote:
Hey all,
   
I think we are missing a quite useful feature that could be
implemented (with some slight modifications) on top of the current
windowing api.
   
We currently provide 2 ways of aggregating (or reducing) over
streams: doing a continuous aggregation and always output the
aggregated value (which cannot be done properly in parallel) or
doing aggregation in a window periodically.
   
What we don't have at the moment is periodic aggregations on the
whole stream. I would even go as far as to remove the continuous
outputting reduce/aggregate it and replace it with this version as
this in return can be done properly in parallel.
   
My suggestion would be that a call:
   
dataStream.reduce(..) dataStream.sum(..)
   
would return a windowed data stream where the window is the whole
record history, and the user would need to define a trigger to get
the actual reduced values like:
   
dataStream.reduce(...).every(Time.of(4,sec)) to get the actual
reduced results. dataStream.sum(...).every(...)
   
I think the current data stream reduce/aggregation is very
confusing without being practical for any normal use-case.
   
Also this would be a very api breaking change (but I would still
make this change as it is much more intuitive than the current
behaviour) so I would try to push it before the release if we can
agree.
   
Cheers, Gyula
   
  
   - --
   ~~~
  
 Dr. Bruno Cadonna
 Postdoctoral Researcher
  
 Databases and Information Systems
 Department of Computer Science
 Humboldt-Universität zu Berlin
  
 http://www.informatik.hu-berlin.de/~cadonnab
  
   ~~~
   -BEGIN PGP SIGNATURE-
   Version: GnuPG v1.4.11 (GNU/Linux)
  
   iQEcBAEBAgAGBQJVNgaeAAoJEKdCIJx7flKwfiMH/AzPpKtse9eMOzFsXSuBslNr
   PZRQ0vpI7vw9eYFIuqp33SltN0zmLmDt3VzgJz0EZK5zSRCF9NOeke1emQwlrPsB
   g65a4XccWT2qPotodF39jTTdE5epeUf8NdE552sr+Ya5LMtt8TmozD0lEOVfNt7n
   R6KQdDU70U0zoCPwv0S13cak8a8k7phGvShXeW4nSZKp8C+WJa3IbUZkHlIlkC1L
   OnyYy4b14bnfjiknKt2mKcjLG7eQEq0X6aN85Zf+5X8BUg3auk9N9Cva2XMRuD1p
   gOoC+2gPZcr2IB9Sgs+s5pxfhaoVpbQ9Z7gRh8BkWqftveA7RD6KymmBxoUtujA=
   =8bVQ
   -END PGP SIGNATURE-
  
 



Akka transparency and serialisation

2015-04-21 Thread Paris Carbone
Hello everyone,


Many of you are already aware of this but it is good to make it clear in the 
mailist. We bumped into this special case with Akka several times already and 
it is important to know where transparency actually breaks.


In short, Akka serialises only messages that get transferred over the wire or 
across JVMs [1]. Thus, we should not rely on messages getting serialised for 
anything we want to transfer using Akka. To overcome this we should either:


1) Do a deep copy of everything passed via Akka messaging

2) Apply serialisation manually before sending messages and transfer only 
pre-serialized data.

3) Never rely on transient fields


cheers

Paris


[1] http://doc.akka.io/docs/akka/snapshot/general/remoting.html



[jira] [Created] (FLINK-1918) NullPointerException at org.apache.flink.client.program.Client's constructor while using ExecutionEnvironment.createRemoteEnvironment

2015-04-21 Thread JIRA
Zoltán Zvara created FLINK-1918:
---

 Summary: NullPointerException at 
org.apache.flink.client.program.Client's constructor while using 
ExecutionEnvironment.createRemoteEnvironment
 Key: FLINK-1918
 URL: https://issues.apache.org/jira/browse/FLINK-1918
 Project: Flink
  Issue Type: Bug
  Components: YARN Client
Reporter: Zoltán Zvara


Trace:

{code}
Exception in thread main java.lang.NullPointerException
at org.apache.flink.client.program.Client.init(Client.java:104)
at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:86)
at 
org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:70)
at Wordcount.main(Wordcount.java:23)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
{code}

The constructor is trying to set configuration parameter 
{{jobmanager.rpc.address}} with 
{{jobManagerAddress.getAddress().getHostAddress()}}, but 
{{jobManagerAddress.holder.addr}} is {{null}}. 
{{jobManagerAddress.holder.hostname}} and {{port}} holds the valid information.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)