Re: How to run wordcount program on Multi-node Cluster.

2017-08-08 Thread Felix Neutatz
Hi,

like Timo said e.g. you need a distributed filesystem like HDFS.

Best regards,
Felix

On Aug 8, 2017 09:01, "P. Ramanjaneya Reddy"  wrote:

Hi Timo,

How to make access the files across TM?

Thanks & Regards,
Ramanji.

On Mon, Aug 7, 2017 at 7:45 PM, Timo Walther  wrote:

> Flink is a distributed software for clusters. You need something like a
> distributed file system. So that input file and output files can be
> accessed from all nodes.
>
> Each TM has a log directory where the execution logs are stored.
>
> You can set additional properties to your output format by importing the
> code in your IDE.
>
> Am 07.08.17 um 16:03 schrieb P. Ramanjaneya Reddy:
>
> Hi Timo,
>> Problem is resolved after copy input file to all tasks managers.
>>
>> and where should generate outputfile? Is it in jobmanager or task
manager?
>>
>> Where can i see the execution logs to understand how word count done each
>> task manager?
>>
>>
>> By the way any option to overwride...?
>>
>> 08/07/2017 19:27:00 Keyed Aggregation -> Sink: Unnamed(1/1) switched to
>> FAILED
>> java.io.IOException: File or directory already exists. Existing files and
>> directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode
>> to
>> overwrite existing files and directories.
>> at
>> org.apache.flink.core.fs.FileSystem.initOutPathLocalFS(FileS
>> ystem.java:763)
>> at
>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.initOutP
>> athLocalFS(SafetyNetWrapperFileSystem.java:135)
>> at
>> org.apache.flink.api.common.io.FileOutputFormat.open(FileOut
>> putFormat.java:231)
>> at
>> org.apache.flink.api.java.io.TextOutputFormat.open(TextOutpu
>> tFormat.java:78)
>> at
>> org.apache.flink.streaming.api.functions.sink.OutputFormatSi
>> nkFunction.open(OutputFormatSinkFunction.java:61)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:111)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:376)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> On Mon, Aug 7, 2017 at 6:49 PM, Timo Walther  wrote:
>>
>> Make sure that the file exists and is accessible from all Flink tasks
>>> managers.
>>>
>>>
>>> Am 07.08.17 um 14:35 schrieb P. Ramanjaneya Reddy:
>>>
>>> Thank you Timo.


 root1@root1-HP-EliteBook-840-G2:~/NAI/Tools/BEAM/Flink_Clust
 er/rama/flink$
 *./bin/flink
 run ./examples/streaming/WordCount.jar --input
 file:///home/root1/hamlet.txt --output file:///home/root1/wordcount_o
 ut*



 Execution of worcountjar gives error...

 08/07/2017 18:03:16 Source: Custom File Source(1/1) switched to FAILED
 java.io.FileNotFoundException: The provided file path
 file:/home/root1/hamlet.txt does not exist.
 at
 org.apache.flink.streaming.api.functions.source.ContinuousFi
 leMonitoringFunction.run(ContinuousFileMonitoringFunction.java:192)
 at
 org.apache.flink.streaming.api.operators.StreamSource.run(
 StreamSource.java:87)
 at
 org.apache.flink.streaming.api.operators.StreamSource.run(
 StreamSource.java:55)
 at
 org.apache.flink.streaming.runtime.tasks.SourceStreamTask.
 run(SourceStreamTask.java:95)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
 StreamTask.java:263)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
 at java.lang.Thread.run(Thread.java:748)


 On Mon, Aug 7, 2017 at 5:56 PM, Timo Walther 
 wrote:

 Hi Ramanji,

> you can find the source code of the examples here:
> https://github.com/apache/flink/blob/master/flink-examples/
> flink-examples-streaming/src/main/java/org/apache/flink/
> streaming/examples/wordcount/WordCount.java
>
> A general introduction how the cluster execution works can be found
> here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> concepts/programming-model.html#programs-and-dataflows
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> concepts/runtime.html
>
> It might also be helpful to have a look at the web interface which can
> show you a nice graph of the job.
>
> I hope this helps. Feel free to ask further questions.
>
> Regards,
> Timo
>
>
> Am 07.08.17 um 14:00 schrieb P. Ramanjaneya Reddy:
>
> Hello Everyone,
>
> I have followed the steps specified below link to Install & Run Apache
>> Flink on Multi-node Cluster.
>>
>> http://data-flair.training/blogs/install-run-deploy-flink-
>> multi-node-cluster/
>> used flink-1.3.2-bin-hadoop27-scala_2.10.tgz for install
>>
>> usin

[jira] [Created] (FLINK-7029) Documentation for WindowFunction is confusing

2017-06-28 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-7029:


 Summary: Documentation for WindowFunction is confusing
 Key: FLINK-7029
 URL: https://issues.apache.org/jira/browse/FLINK-7029
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Felix Neutatz
Priority: Trivial


Hi,

in the [example of the WindowFunction in the 
documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#windowfunction---the-generic-case]
 we use WindowFunction, String, String, TimeWindow>. That 
means that our key data-type is a String. For me, this is highly confusing, 
since we can only have a String data type for the key, if we implement a custom 
key selector. Usually people, especially beginners, will use something like 
keyBy(ID), keyBy("attributeName"), which will always return a tuple e.g. a 
Tuple1. It would be great if somebody could change this to a tuple key 
type in  the example. I am sure this might help beginners to understand that by 
default the key type is a tuple.

Moreover, another suggestion would be that we overwrite keyBy() in a way that 
if we just get one attribute, we return this type directly instead of wrapping 
it in a Tuple1.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Wish to Contribute - Andrea Spina

2017-02-17 Thread Felix Neutatz
Great to have you Andrea :)

On Feb 17, 2017 15:21, "Aljoscha Krettek"  wrote:

> Welcome to the community, Andrea! :-)
>
> On Fri, 17 Feb 2017 at 10:22 Fabian Hueske  wrote:
>
> > Hi Andrea,
> >
> > welcome to the community!
> > I gave you Contributor permissions. You can now assign issues to
> yourself.
> >
> > Best, Fabian
> >
> > 2017-02-17 9:14 GMT+01:00 Andrea Spina :
> >
> > > Dear Gordon,
> > >
> > > Thank you so much. My JIRA's id is spi-x-i. Can't wait to helping.
> > >
> > > Cheers,
> > > Andrea
> > >
> > >
> > >
> > >
> > > --
> > > View this message in context: http://apache-flink-mailing-
> > > list-archive.1008284.n3.nabble.com/Wish-to-Contribute-
> > > Andrea-Spina-tp15991p15996.html
> > > Sent from the Apache Flink Mailing List archive. mailing list archive
> at
> > > Nabble.com.
> > >
> >
>


Re: New Flink team member - Kate Eri.

2017-02-13 Thread Felix Neutatz
 > engines / native solvers (i.e. CPU/GPU).
> > >
> > > https://github.com/apache/mahout/tree/master/viennacl-omp
> > > https://github.com/apache/mahout/tree/master/viennacl
> > >
> > > Best,
> > > tg
> > >
> > >
> > > Trevor Grant
> > > Data Scientist
> > > https://github.com/rawkintrevo
> > > http://stackexchange.com/users/3002022/rawkintrevo
> > > http://trevorgrant.org
> > >
> > > *"Fortunate is he, who is able to know the causes of things."  -Virgil*
> > >
> > >
> > > On Fri, Feb 10, 2017 at 7:01 AM, Katherin Eri 
> > > wrote:
> > >
> > >> Thank you Felix, for provided information.
> > >>
> > >> Currently I analyze the provided integration of Flink with SystemML.
> > >>
> > >> And also gather the information for the ticket  FLINK-1730
> > >> <https://issues.apache.org/jira/browse/FLINK-1730>, maybe we will
> take
> > it
> > >> to work, to unlock SystemML/Flink integration.
> > >>
> > >>
> > >>
> > >> чт, 9 февр. 2017 г. в 0:17, Felix Neutatz
>  > >> d>:
> > >>
> > >> > Hi Kate,
> > >> >
> > >> > 1) - Broadcast:
> > >> >
> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-5%3A+
> > >> Only+send+data+to+each+taskmanager+once+for+broadcasts
> > >> >  - Caching: https://issues.apache.org/jira/browse/FLINK-1730
> > >> >
> > >> > 2) I have no idea about the GPU implementation. The SystemML mailing
> > >> list
> > >> > will probably help you out their.
> > >> >
> > >> > Best regards,
> > >> > Felix
> > >> >
> > >> > 2017-02-08 14:33 GMT+01:00 Katherin Eri :
> > >> >
> > >> > > Thank you Felix, for your point, it is quite interesting.
> > >> > >
> > >> > > I will take a look at the code, of the provided Flink integration.
> > >> > >
> > >> > > 1)You have these problems with Flink: >>we realized that the
> > lack
> > >> of
> > >> > a
> > >> > > caching operator and a broadcast issue highly effects the
> > performance,
> > >> > have
> > >> > > you already asked about this the community? In case yes: please
> > >> provide
> > >> > the
> > >> > > reference to the ticket or the topic of letter.
> > >> > >
> > >> > > 2)You have said, that SystemML provides GPU support. I have
> seen
> > >> > > SystemML’s source code and would like to ask: why you have decided
> > to
> > >> > > implement your own integration with cuda? Did you try to consider
> > >> ND4J,
> > >> > or
> > >> > > because it is younger, you support your own implementation?
> > >> > >
> > >> > > вт, 7 февр. 2017 г. в 18:35, Felix Neutatz <
> neut...@googlemail.com
> > >:
> > >> > >
> > >> > > > Hi Katherin,
> > >> > > >
> > >> > > > we are also working in a similar direction. We implemented a
> > >> prototype
> > >> > to
> > >> > > > integrate with SystemML:
> > >> > > > https://github.com/apache/incubator-systemml/pull/119
> > >> > > > SystemML provides many different matrix formats, operations, GPU
> > >> > support
> > >> > > > and a couple of DL algorithms. Unfortunately, we realized that
> the
> > >> lack
> > >> > > of
> > >> > > > a caching operator and a broadcast issue highly effects the
> > >> performance
> > >> > > > (e.g. compared to Spark). At the moment I am trying to tackle
> the
> > >> > > broadcast
> > >> > > > issue. But caching is still a problem for us.
> > >> > > >
> > >> > > > Best regards,
> > >> > > > Felix
> > >> > > >
> > >> > > > 2017-02-07 16:22 GMT+01:00 Katherin Eri  >:
> > >> > > >
> > >> > > > > Thank you, Till.
> > >> > > > >
> > >> > > > > 1)  Regarding N

Re: New Flink team member - Kate Eri.

2017-02-08 Thread Felix Neutatz
Hi Kate,

1) - Broadcast:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts
 - Caching: https://issues.apache.org/jira/browse/FLINK-1730

2) I have no idea about the GPU implementation. The SystemML mailing list
will probably help you out their.

Best regards,
Felix

2017-02-08 14:33 GMT+01:00 Katherin Eri :

> Thank you Felix, for your point, it is quite interesting.
>
> I will take a look at the code, of the provided Flink integration.
>
> 1)You have these problems with Flink: >>we realized that the lack of a
> caching operator and a broadcast issue highly effects the performance, have
> you already asked about this the community? In case yes: please provide the
> reference to the ticket or the topic of letter.
>
> 2)You have said, that SystemML provides GPU support. I have seen
> SystemML’s source code and would like to ask: why you have decided to
> implement your own integration with cuda? Did you try to consider ND4J, or
> because it is younger, you support your own implementation?
>
> вт, 7 февр. 2017 г. в 18:35, Felix Neutatz :
>
> > Hi Katherin,
> >
> > we are also working in a similar direction. We implemented a prototype to
> > integrate with SystemML:
> > https://github.com/apache/incubator-systemml/pull/119
> > SystemML provides many different matrix formats, operations, GPU support
> > and a couple of DL algorithms. Unfortunately, we realized that the lack
> of
> > a caching operator and a broadcast issue highly effects the performance
> > (e.g. compared to Spark). At the moment I am trying to tackle the
> broadcast
> > issue. But caching is still a problem for us.
> >
> > Best regards,
> > Felix
> >
> > 2017-02-07 16:22 GMT+01:00 Katherin Eri :
> >
> > > Thank you, Till.
> > >
> > > 1)  Regarding ND4J, I didn’t know about such a pity and critical
> > > restriction of it -> lack of sparsity optimizations, and you are right:
> > > this issue is still actual for them. I saw that Flink uses Breeze, but
> I
> > > thought its usage caused by some historical reasons.
> > >
> > > 2)  Regarding integration with DL4J, I have read the source code of
> > > DL4J/Spark integration, that’s why I have declined my idea of reuse of
> > > their word2vec implementation for now, for example. I can perform
> deeper
> > > investigation of this topic, if it required.
> > >
> > >
> > >
> > > So I feel that we have the following picture:
> > >
> > > 1)  DL integration investigation, could be part of Apache Bahir. I
> > can
> > > perform futher investigation of this topic, but I thik we need some
> > > separated ticket for this to track this activity.
> > >
> > > 2)  GPU support, required for DL is interesting, but requires ND4J
> > for
> > > example.
> > >
> > > 3)  ND4J couldn’t be incorporated because it doesn’t support
> sparsity
> > > <https://deeplearning4j.org/roadmap.html> [1].
> > >
> > > Regarding ND4J is this the single blocker for incorporation of it or
> may
> > be
> > > some others known?
> > >
> > >
> > > [1] https://deeplearning4j.org/roadmap.html
> > >
> > > вт, 7 февр. 2017 г. в 16:26, Till Rohrmann :
> > >
> > > Thanks for initiating this discussion Katherin. I think you're right
> that
> > > in general it does not make sense to reinvent the wheel over and over
> > > again. Especially if you only have limited resources at hand. So if we
> > > could integrate Flink with some existing library that would be great.
> > >
> > > In the past, however, we couldn't find a good library which provided
> > enough
> > > freedom to integrate it with Flink. Especially if you want to have
> > > distributed and somewhat high-performance implementations of ML
> > algorithms
> > > you would have to take Flink's execution model (capabilities as well as
> > > limitations) into account. That is mainly the reason why we started
> > > implementing some of the algorithms "natively" on Flink.
> > >
> > > If I remember correctly, then the problem with ND4J was and still is
> that
> > > it does not support sparse matrices which was a requirement from our
> > side.
> > > As far as I know, it is quite common that you have sparse data
> structures
> > > when dealing with large scale problems. That's why we built our own
> > > abstracti

Re: New Flink team member - Kate Eri.

2017-02-07 Thread Felix Neutatz
Hi Katherin,

we are also working in a similar direction. We implemented a prototype to
integrate with SystemML:
https://github.com/apache/incubator-systemml/pull/119
SystemML provides many different matrix formats, operations, GPU support
and a couple of DL algorithms. Unfortunately, we realized that the lack of
a caching operator and a broadcast issue highly effects the performance
(e.g. compared to Spark). At the moment I am trying to tackle the broadcast
issue. But caching is still a problem for us.

Best regards,
Felix

2017-02-07 16:22 GMT+01:00 Katherin Eri :

> Thank you, Till.
>
> 1)  Regarding ND4J, I didn’t know about such a pity and critical
> restriction of it -> lack of sparsity optimizations, and you are right:
> this issue is still actual for them. I saw that Flink uses Breeze, but I
> thought its usage caused by some historical reasons.
>
> 2)  Regarding integration with DL4J, I have read the source code of
> DL4J/Spark integration, that’s why I have declined my idea of reuse of
> their word2vec implementation for now, for example. I can perform deeper
> investigation of this topic, if it required.
>
>
>
> So I feel that we have the following picture:
>
> 1)  DL integration investigation, could be part of Apache Bahir. I can
> perform futher investigation of this topic, but I thik we need some
> separated ticket for this to track this activity.
>
> 2)  GPU support, required for DL is interesting, but requires ND4J for
> example.
>
> 3)  ND4J couldn’t be incorporated because it doesn’t support sparsity
>  [1].
>
> Regarding ND4J is this the single blocker for incorporation of it or may be
> some others known?
>
>
> [1] https://deeplearning4j.org/roadmap.html
>
> вт, 7 февр. 2017 г. в 16:26, Till Rohrmann :
>
> Thanks for initiating this discussion Katherin. I think you're right that
> in general it does not make sense to reinvent the wheel over and over
> again. Especially if you only have limited resources at hand. So if we
> could integrate Flink with some existing library that would be great.
>
> In the past, however, we couldn't find a good library which provided enough
> freedom to integrate it with Flink. Especially if you want to have
> distributed and somewhat high-performance implementations of ML algorithms
> you would have to take Flink's execution model (capabilities as well as
> limitations) into account. That is mainly the reason why we started
> implementing some of the algorithms "natively" on Flink.
>
> If I remember correctly, then the problem with ND4J was and still is that
> it does not support sparse matrices which was a requirement from our side.
> As far as I know, it is quite common that you have sparse data structures
> when dealing with large scale problems. That's why we built our own
> abstraction which can have different implementations. Currently, the
> default implementation uses Breeze.
>
> I think the support for GPU based operations and the actual resource
> management are two orthogonal things. The implementation would have to work
> with no GPUs available anyway. If the system detects that GPUs are
> available, then ideally it would exploit them. Thus, we could add this
> feature later and maybe integrate it with FLINK-5131 [1].
>
> Concerning the integration with DL4J I think that Theo's proposal to do it
> in a separate repository (maybe as part of Apache Bahir) is a good idea.
> We're currently thinking about outsourcing some of Flink's libraries into
> sub projects. This could also be an option for the DL4J integration then.
> In general I think it should be feasible to run DL4J on Flink given that it
> also runs on Spark. Have you already looked at it closer?
>
> [1] https://issues.apache.org/jira/browse/FLINK-5131
>
> Cheers,
> Till
>
> On Tue, Feb 7, 2017 at 11:47 AM, Katherin Eri 
> wrote:
>
> > Thank you Theodore, for your reply.
> >
> > 1)Regarding GPU, your point is clear and I agree with it, ND4J looks
> > appropriate. But, my current understanding is that, we also need to cover
> > some resource management questions -> when we need to provide GPU support
> > we also need to manage it like resource. For example, Mesos has already
> > supported GPU like resource item: Initial support for GPU resources.
> > 
> > Flink
> > uses Mesos as cluster manager, and this means that this feature of Mesos
> > could be reused. Also memory managing questions in Flink regarding GPU
> > should be clarified.
> >
> > 2)Regarding integration with DL4J: what stops us to initialize ticket
> > and start the discussion around this topic? We need some user story or
> the
> > community is not sure that DL is really helpful? Why the discussion with
> > Adam
> > Gibson just finished with no implementation of any idea? What concerns do
> > we have?
> >
> > пн, 6 февр. 2017 г. в 15:01, Theodore Vasiloudis <
> > theodoros.vasilou...@gmail.

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-12-02 Thread Felix Neutatz
Hi everybody,

I implemented the second approach (see https://cwiki.apache.org/confl
uence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskman
ager+once+for+broadcasts). So each subpartition will be read by m tasks (m
= number of task managers) and the other tasks will notify the subpartition
that they don't need to read. This solves the problem, and we release the
subpartition just when we don't need it anymore.

The message I sent for all task which don't need to read is "
notifySubpartitionConsumed()"
https://github.com/FelixNeutatz/incubator-flink/blob/1b58d9c9df89620f2557b59e7fde40ffe04f49d8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L460

This means I first have to connect via a PartitionRequest and then i will
notify all channels.
One problem of using the standard PartitionRequest is that we will already
fill the first buffer:
https://github.com/FelixNeutatz/incubator-flink/blob/oneSubpartition/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java#L123

So my question is: is that ok, or:
1) should we introduce another Netty.Message
PartitionRequestAndNotifyConsumed
2) should we extend the PartitionRequest with the attribute "boolean
getAhead"

Current problems:
Native iterations:
Native iterations work but are not optimized. Theoretically, in the case of
native iterations we can also notify the subpartitions instead of reading
them, but at the moment I get the following exception when I do so:
java.lang.IllegalStateException: Queried for a buffer before requesting the
subpartition.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.io.network.partition.consumer.Local
InputChannel.getNextBuffer(LocalInputChannel.java:152)
at org.apache.flink.runtime.io.network.partition.consumer.Singl
eInputGate.getNextBufferOrEvent(SingleInputGate.java:424)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
dReader.getNextRecord(AbstractRecordReader.java:87)
at org.apache.flink.runtime.io.network.api.reader.MutableRecord
Reader.next(MutableRecordReader.java:42)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(
ReaderIterator.java:73)
at org.apache.flink.runtime.broadcast.BroadcastVariableMaterial
ization.materializeVariable(BroadcastVariableMaterialization.java:114)

In general, it seems to me that this reduces network traffic even for
pipelined partitions in the "old" architecture. I will further investigate
why this is not working. For a simple map job with pipelined partitions
this already works. So there has to be some kind of iteration specific
thing which keeps that from working.

But for now I would propose to first push the improvements to Flink without
the iteration improvements. The overhead of the two code paths are just 2
lines of codes, which is really little.

I am happy to hear your thoughts :)

Best regards,
Felix

2016-11-10 12:24 GMT+01:00 Felix Neutatz :

> Hi everybody,
>
> the previous approach turned out to have an issue. Since we only write to
> one subpartition, we have N-1 empty subpartitions per Task (where N =
> degree of parallelism). In the current approach I didn't consume these
> empty subpartitions. When you don't consume a subpartition it won't be
> released. So we have a memory leak.
>
> One workaround would be to read the empty subpartitions. But this is a
> really ugly work-around.
>
> So I had a chat with Till and we decided to create only one subpartition
> instead of N subpartitions per task. I have already implemented this
> approach.
>
> Now the problem is that we need to know, when to release this
> subpartition. We will create M subpartition-views per subpartition (where M
> is the number of task managers & M <= N).
>
> There are many ways to solve this problem:
> 1. Tell the subpartition how many taskmanagers will consume it.
> (=> propagate M)
> 2. All tasks which don't need to read the subpartition, send a message to
> the subpartition. So the subpartition will receive M release requests and
> N-M "I am done" requests. So when the subpartition knows the number of
> parallelism N, we are fine. (=> propagate N)
>
> Any thoughts how to tackle this problem?
>
> Best regards,
> Felix
>
> 2016-08-10 19:14 GMT+02:00 Till Rohrmann :
>
>> Cool first version Felix :-)
>>
>> On Wed, Aug 10, 2016 at 6:48 PM, Stephan Ewen  wrote:
>>
>> > Cool, nice results!
>> >
>> > For the iteration unspecialization - we probably should design this
>> hand in
>> > hand with the streaming fault tolerance, as they share the notion of
>> > "intermediate result versions".
>> >
>> >
>> > On Wed, Aug 10, 20

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-11-10 Thread Felix Neutatz
Hi everybody,

the previous approach turned out to have an issue. Since we only write to
one subpartition, we have N-1 empty subpartitions per Task (where N =
degree of parallelism). In the current approach I didn't consume these
empty subpartitions. When you don't consume a subpartition it won't be
released. So we have a memory leak.

One workaround would be to read the empty subpartitions. But this is a
really ugly work-around.

So I had a chat with Till and we decided to create only one subpartition
instead of N subpartitions per task. I have already implemented this
approach.

Now the problem is that we need to know, when to release this subpartition.
We will create M subpartition-views per subpartition (where M is the number
of task managers & M <= N).

There are many ways to solve this problem:
1. Tell the subpartition how many taskmanagers will consume it.
(=> propagate M)
2. All tasks which don't need to read the subpartition, send a message to
the subpartition. So the subpartition will receive M release requests and
N-M "I am done" requests. So when the subpartition knows the number of
parallelism N, we are fine. (=> propagate N)

Any thoughts how to tackle this problem?

Best regards,
Felix

2016-08-10 19:14 GMT+02:00 Till Rohrmann :

> Cool first version Felix :-)
>
> On Wed, Aug 10, 2016 at 6:48 PM, Stephan Ewen  wrote:
>
> > Cool, nice results!
> >
> > For the iteration unspecialization - we probably should design this hand
> in
> > hand with the streaming fault tolerance, as they share the notion of
> > "intermediate result versions".
> >
> >
> > On Wed, Aug 10, 2016 at 6:09 PM, Felix Neutatz 
> > wrote:
> >
> > > Hi everybody,
> > >
> > > I found a quick and dirty way to make the blocking subpartition
> readable
> > by
> > > multiple readers. In the JobGraph generation I make all broadcast
> > > partitions blocking (see more details here:
> > > https://github.com/FelixNeutatz/incubator-flink/
> > > commits/blockingMultipleReads).
> > > I want to point out that this branch is only experimental!
> > >
> > > This works for the simple Map().withBroadcastSet() use case.
> > >
> > > To test this approach, I run our peel bundle flink-broadcast (
> > > https://github.com/TU-Berlin-DIMA/flink-broadcast) on the IBM power
> > > cluster. Ibm-power has 8 nodes and we scale the number of slots per
> node
> > > from 1 - 16:
> > >
> > > broadcast.ibm-power-1 broadcast.01 6597.33
> > > broadcast.ibm-power-1 broadcast.02 5997
> > > broadcast.ibm-power-1 broadcast.04 6576.67
> > > broadcast.ibm-power-1 broadcast.08 7024.33
> > > broadcast.ibm-power-1 broadcast.16 6933.33
> > >
> > > The last row is the averaged run time in milliseconds over 3 runs. You
> > can
> > > clearly see, that the run time stays constant :)
> > >
> > > As discussed, this approach doesn't work yet for native iterations (see
> > > FLINK-1713).
> > >
> > > So in the next weeks I will work on the native iterations as Stephan
> > > proposed.
> > >
> > > Best regards,
> > > Felix
> > >
> > >
> > >
> > > 2016-08-09 21:29 GMT+07:00 Stephan Ewen :
> > >
> > > > I agree with Till. Changing the basic data exchange mechanism would
> > screw
> > > > up many other ongoing efforts, like more incremental recovery.
> > > >
> > > > It seems to make this properly applicable, we need to first
> > un-specialize
> > > > the iterations.
> > > >
> > > > (1) Allow for "versioned" intermediate results, i.e.,
> > > result-x-superstep1,
> > > > result-x-superstep2, result-x-superstep3, result-x-superstep4, ...
> > > > We need something similar for fined grained recovery in streaming
> > > > (result-x-checkpoint1, result-x-checkpoint2, result-x-checkpoint3,
> > > > result-x-checkpoint4, ...) so it may be worth addressing that soon
> > > anyways.
> > > >
> > > > (2) Make iterations not dependent on the special local back channel.
> > > > Then we can simply schedule iterations like all other things.
> > > >
> > > > (3) Do the actual FLIP-5 proposal
> > > >
> > > >
> > > > That's quite an effort, but I fear all else will break the engine and
> > > other
> > > > efforts.
> > > >
> > > > Best,
> > > > Stephan
> > > >
&

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-08-10 Thread Felix Neutatz
Hi everybody,

I found a quick and dirty way to make the blocking subpartition readable by
multiple readers. In the JobGraph generation I make all broadcast
partitions blocking (see more details here:
https://github.com/FelixNeutatz/incubator-flink/commits/blockingMultipleReads).
I want to point out that this branch is only experimental!

This works for the simple Map().withBroadcastSet() use case.

To test this approach, I run our peel bundle flink-broadcast (
https://github.com/TU-Berlin-DIMA/flink-broadcast) on the IBM power
cluster. Ibm-power has 8 nodes and we scale the number of slots per node
from 1 - 16:

broadcast.ibm-power-1 broadcast.01 6597.33
broadcast.ibm-power-1 broadcast.02 5997
broadcast.ibm-power-1 broadcast.04 6576.67
broadcast.ibm-power-1 broadcast.08 7024.33
broadcast.ibm-power-1 broadcast.16 6933.33

The last row is the averaged run time in milliseconds over 3 runs. You can
clearly see, that the run time stays constant :)

As discussed, this approach doesn't work yet for native iterations (see
FLINK-1713).

So in the next weeks I will work on the native iterations as Stephan
proposed.

Best regards,
Felix



2016-08-09 21:29 GMT+07:00 Stephan Ewen :

> I agree with Till. Changing the basic data exchange mechanism would screw
> up many other ongoing efforts, like more incremental recovery.
>
> It seems to make this properly applicable, we need to first un-specialize
> the iterations.
>
> (1) Allow for "versioned" intermediate results, i.e., result-x-superstep1,
> result-x-superstep2, result-x-superstep3, result-x-superstep4, ...
> We need something similar for fined grained recovery in streaming
> (result-x-checkpoint1, result-x-checkpoint2, result-x-checkpoint3,
> result-x-checkpoint4, ...) so it may be worth addressing that soon anyways.
>
> (2) Make iterations not dependent on the special local back channel.
> Then we can simply schedule iterations like all other things.
>
> (3) Do the actual FLIP-5 proposal
>
>
> That's quite an effort, but I fear all else will break the engine and other
> efforts.
>
> Best,
> Stephan
>
>
>
>
>
> On Tue, Aug 9, 2016 at 4:05 PM, Till Rohrmann 
> wrote:
>
> > Hi Felix,
> >
> > if we cannot work around the problem with blocking intermediate results
> in
> > iterations, then we have to make FLINK-1713 a blocker for this new issue.
> > But maybe you can also keep the current broadcasting mechanism to be used
> > within iterations only. Then we can address the iteration problem later.
> >
> > Cheers,
> > Till
> >
> > On Tue, Aug 9, 2016 at 3:54 PM, Felix Neutatz 
> > wrote:
> >
> > > Hi Till,
> > >
> > > thanks for the fast answer. I also think this should be the way to go.
> So
> > > should I open a new jira "Make blocking SpillableSubpartition able to
> be
> > > read multiple times". Moreover should I mark this jira and FLINK-1713
> > > <https://issues.apache.org/jira/browse/FLINK-1713> as blocking for the
> > > broadcast jira? What do you think?
> > >
> > > Best regards,
> > > Felix
> > >
> > > 2016-08-09 17:41 GMT+07:00 Till Rohrmann :
> > >
> > > > Hi Felix,
> > > >
> > > > I'm not sure whether PipelinedSubpartition should be readable more
> than
> > > > once because then it would effectively mean that we materialize the
> > > > elements of the pipelined subpartition for stragglers. Therefore, I
> > think
> > > > that we should make blocking intermediate results readable more than
> > > once.
> > > > This will also be beneficial for interactive programs where we
> continue
> > > > from the results of previous Flink jobs.
> > > >
> > > > It might also be interesting to have a blocking mode which schedules
> > its
> > > > consumers once the first result is there. Thus, having a mixture of
> > > > pipelined and blocking mode.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz <
> neut...@googlemail.com>
> > > > wrote:
> > > >
> > > > > Hi Stephan,
> > > > >
> > > > > I did some research about blocking intermediate results. It turns
> out
> > > > that
> > > > > neither PipelinedSubpartition (see line 178) nor blocking
> > intermediate
> > > > > results (see SpillableSubpartition line: 189) can be read multiple
> > > times.
> > > > > Moreover blocking intermediate re

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-08-09 Thread Felix Neutatz
Hi Till,

thanks for the fast answer. I also think this should be the way to go. So
should I open a new jira "Make blocking SpillableSubpartition able to be
read multiple times". Moreover should I mark this jira and FLINK-1713
<https://issues.apache.org/jira/browse/FLINK-1713> as blocking for the
broadcast jira? What do you think?

Best regards,
Felix

2016-08-09 17:41 GMT+07:00 Till Rohrmann :

> Hi Felix,
>
> I'm not sure whether PipelinedSubpartition should be readable more than
> once because then it would effectively mean that we materialize the
> elements of the pipelined subpartition for stragglers. Therefore, I think
> that we should make blocking intermediate results readable more than once.
> This will also be beneficial for interactive programs where we continue
> from the results of previous Flink jobs.
>
> It might also be interesting to have a blocking mode which schedules its
> consumers once the first result is there. Thus, having a mixture of
> pipelined and blocking mode.
>
> Cheers,
> Till
>
> On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz 
> wrote:
>
> > Hi Stephan,
> >
> > I did some research about blocking intermediate results. It turns out
> that
> > neither PipelinedSubpartition (see line 178) nor blocking intermediate
> > results (see SpillableSubpartition line: 189) can be read multiple times.
> > Moreover blocking intermediate results are currently not supported in
> > native iterations (see https://issues.apache.org/jira/browse/FLINK-1713
> ).
> > So there are three ways to solve this:
> > 1) We extend Pipelined subpartitions to make it possible to read them
> > multiple times
> > 2) We extend Blocking subpartitions to make it possible to read them
> > multiple times, but then we also have to fix FLINK-1713. So we can use
> > broadcasts in native iterations
> > 3) We create one pipelined subpartition for every taskmanager. Problem:
> The
> > more taskmanager there are, the more redundant data we store, but the
> > network traffic stays optimal.
> >
> > Thank you for your help,
> > Felix
> >
> > 2016-08-01 22:51 GMT+07:00 Stephan Ewen :
> >
> > > Hi Felix!
> > >
> > > Hope this helps_
> > >
> > > Concerning (1.1) - The producer does not think in term of number of
> > target
> > > TaskManagers. That number can, after all, change in the presence of a
> > > failure and recovery. The producer should, for its own result, not care
> > how
> > > many consumers it will have (Tasks), but produce it only once.
> > >
> > > Concerning (1.2)  - Only "blocking" intermediate results can be
> consumed
> > > multiple times. Data sent to broadcast variables must thus be always a
> > > blocking intermediate result.
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz <
> neut...@googlemail.com>
> > > wrote:
> > >
> > > > Hi Stephan,
> > > >
> > > > thanks for the great ideas. First I have some questions:
> > > >
> > > > 1.1) Does every task generate an intermediate result partition for
> > every
> > > > target task or is that already implemented in a way so that there are
> > > only
> > > > as many intermediate result partitions per task manager as target
> > tasks?
> > > > (Example: There are 2 task managers with 2 tasks each. Do we get 4
> > > > intermediate result partitions per task manager or do we get 8?)
> > > > 1.2) How can I consume an intermediate result partition multiple
> times?
> > > > When I tried that I got the following exception:
> > > > Caused by: java.lang.IllegalStateException: Subpartition 0 of
> > > > dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is
> > > being
> > > > or already has been consumed, but pipelined subpartitions can only be
> > > > consumed once.
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> > > createReadView(PipelinedSubpartition.java:179)
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> > > createReadView(PipelinedSubpartition.java:36)
> > > > at
> > > >
> > > > org.apache.flink.runtime.io.network.partition.ResultPartition.
> > > createSubpartitionView(ResultPartition.java:348)
> > > > at
> > > >

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-08-08 Thread Felix Neutatz
Hi Stephan,

I did some research about blocking intermediate results. It turns out that
neither PipelinedSubpartition (see line 178) nor blocking intermediate
results (see SpillableSubpartition line: 189) can be read multiple times.
Moreover blocking intermediate results are currently not supported in
native iterations (see https://issues.apache.org/jira/browse/FLINK-1713 ).
So there are three ways to solve this:
1) We extend Pipelined subpartitions to make it possible to read them
multiple times
2) We extend Blocking subpartitions to make it possible to read them
multiple times, but then we also have to fix FLINK-1713. So we can use
broadcasts in native iterations
3) We create one pipelined subpartition for every taskmanager. Problem: The
more taskmanager there are, the more redundant data we store, but the
network traffic stays optimal.

Thank you for your help,
Felix

2016-08-01 22:51 GMT+07:00 Stephan Ewen :

> Hi Felix!
>
> Hope this helps_
>
> Concerning (1.1) - The producer does not think in term of number of target
> TaskManagers. That number can, after all, change in the presence of a
> failure and recovery. The producer should, for its own result, not care how
> many consumers it will have (Tasks), but produce it only once.
>
> Concerning (1.2)  - Only "blocking" intermediate results can be consumed
> multiple times. Data sent to broadcast variables must thus be always a
> blocking intermediate result.
>
> Greetings,
> Stephan
>
>
> On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz 
> wrote:
>
> > Hi Stephan,
> >
> > thanks for the great ideas. First I have some questions:
> >
> > 1.1) Does every task generate an intermediate result partition for every
> > target task or is that already implemented in a way so that there are
> only
> > as many intermediate result partitions per task manager as target tasks?
> > (Example: There are 2 task managers with 2 tasks each. Do we get 4
> > intermediate result partitions per task manager or do we get 8?)
> > 1.2) How can I consume an intermediate result partition multiple times?
> > When I tried that I got the following exception:
> > Caused by: java.lang.IllegalStateException: Subpartition 0 of
> > dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is
> being
> > or already has been consumed, but pipelined subpartitions can only be
> > consumed once.
> > at
> >
> > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> createReadView(PipelinedSubpartition.java:179)
> > at
> >
> > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> createReadView(PipelinedSubpartition.java:36)
> > at
> >
> > org.apache.flink.runtime.io.network.partition.ResultPartition.
> createSubpartitionView(ResultPartition.java:348)
> > at
> >
> > org.apache.flink.runtime.io.network.partition.ResultPartitionManager.
> createSubpartitionView(ResultPartitionManager.java:81)
> > at
> >
> > org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.
> channelRead0(PartitionRequestServerHandler.java:98)
> > at
> >
> > org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.
> channelRead0(PartitionRequestServerHandler.java:41)
> > at
> >
> > io.netty.channel.SimpleChannelInboundHandler.channelRead(
> SimpleChannelInboundHandler.java:105)
> >
> > My status update: Since Friday I am implementing your idea described in
> > (2). Locally this approach already works (for less than 170 iterations).
> I
> > will investigate further to solve that issue.
> >
> > But I am still not sure how to implement (1). Maybe we introduce a
> similar
> > construct like the BroadcastVariableManager to share the RecordWriter
> among
> > all tasks of a taskmanager. I am interested in your thoughts :)
> >
> > Best regards,
> > Felix
> >
> > 2016-07-22 17:25 GMT+02:00 Stephan Ewen :
> >
> > > Hi Felix!
> > >
> > > Interesting suggestion. Here are some thoughts on the design.
> > >
> > > The two core changes needed to send data once to the TaskManagers are:
> > >
> > >   (1) Every sender needs to produce its stuff once (rather than for
> every
> > > target task), there should not be redundancy there.
> > >   (2) Every TaskManager should request the data once, other tasks in
> the
> > > same TaskManager pick it up from there.
> > >
> > >
> > > The current receiver-initialted pull model is actually a good
> abstraction
> > > for that, I think.
> > >
> > > Lets look at (1):
> > >
>

Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-07-27 Thread Felix Neutatz
Hi Stephan,

thanks for the great ideas. First I have some questions:

1.1) Does every task generate an intermediate result partition for every
target task or is that already implemented in a way so that there are only
as many intermediate result partitions per task manager as target tasks?
(Example: There are 2 task managers with 2 tasks each. Do we get 4
intermediate result partitions per task manager or do we get 8?)
1.2) How can I consume an intermediate result partition multiple times?
When I tried that I got the following exception:
Caused by: java.lang.IllegalStateException: Subpartition 0 of
dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is being
or already has been consumed, but pipelined subpartitions can only be
consumed once.
at
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:179)
at
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.createReadView(PipelinedSubpartition.java:36)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:348)
at
org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:81)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:98)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:41)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)

My status update: Since Friday I am implementing your idea described in
(2). Locally this approach already works (for less than 170 iterations). I
will investigate further to solve that issue.

But I am still not sure how to implement (1). Maybe we introduce a similar
construct like the BroadcastVariableManager to share the RecordWriter among
all tasks of a taskmanager. I am interested in your thoughts :)

Best regards,
Felix

2016-07-22 17:25 GMT+02:00 Stephan Ewen :

> Hi Felix!
>
> Interesting suggestion. Here are some thoughts on the design.
>
> The two core changes needed to send data once to the TaskManagers are:
>
>   (1) Every sender needs to produce its stuff once (rather than for every
> target task), there should not be redundancy there.
>   (2) Every TaskManager should request the data once, other tasks in the
> same TaskManager pick it up from there.
>
>
> The current receiver-initialted pull model is actually a good abstraction
> for that, I think.
>
> Lets look at (1):
>
>   - Currently, the TaskManagers have a separate intermediate result
> partition for each target slot. They should rather have one intermediate
> result partition (saves also repeated serialization) that is consumed
> multiple times.
>
>   - Since the results that are to be broadcasted are always "blocking",
> they can be consumed (pulled)  multiples times.
>
> Lets look at (2):
>
>   - The current BroadcastVariableManager has the functionality to let the
> first accessor of the BC-variable materialize the result.
>
>   - It could be changed such that only the first accessor creates a
> RecordReader, so the others do not even request the stream. That way, the
> TaskManager should pull only one stream from each producing task, which
> means the data is transferred once.
>
>
> That would also work perfectly with the current failure / recovery model.
>
> What do you think?
>
> Stephan
>
>
> On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz 
> wrote:
>
> > Hi everybody,
> >
> > I want to improve the performance of broadcasts in Flink. Therefore Till
> > told me to start a FLIP on this topic to discuss how to go forward to
> solve
> > the current issues for broadcasts.
> >
> > The problem in a nutshell: Instead of sending data to each taskmanager
> only
> > once, at the moment the data is sent to each task. This means if there
> are
> > 3 slots on each taskmanager we will send the data 3 times instead of
> once.
> >
> > There are multiple ways to tackle this problem and I started to do some
> > research and investigate. You can follow my thought process here:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts
> >
> > This is my first FLIP. So please correct me, if I did something wrong.
> >
> > I am interested in your thoughts about how to solve this issue. Do you
> > think my approach is heading into the right direction or should we
> follow a
> > totally different one.
> >
> > I am happy about any comment :)
> >
> > Best regards,
> > Felix
> >
>


[DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts

2016-07-22 Thread Felix Neutatz
Hi everybody,

I want to improve the performance of broadcasts in Flink. Therefore Till
told me to start a FLIP on this topic to discuss how to go forward to solve
the current issues for broadcasts.

The problem in a nutshell: Instead of sending data to each taskmanager only
once, at the moment the data is sent to each task. This means if there are
3 slots on each taskmanager we will send the data 3 times instead of once.

There are multiple ways to tackle this problem and I started to do some
research and investigate. You can follow my thought process here:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts

This is my first FLIP. So please correct me, if I did something wrong.

I am interested in your thoughts about how to solve this issue. Do you
think my approach is heading into the right direction or should we follow a
totally different one.

I am happy about any comment :)

Best regards,
Felix


Re: Broadcast data sent increases with # slots per TM

2016-07-21 Thread Felix Neutatz
Hi everybody,

I found an issue with my first approach therefore I couldn't run the
experiments yet. In the design document I summarized my ideas and work of
the last weeks on this issue.

You can find the design document here:
https://docs.google.com/document/d/1odYIvmQt4feonQF9q-btBnGvrzzN3lX0Os6rzHcCOjA/edit?usp=sharing

I highly appreciate any idea or comment and I am looking forward to the
discussion to finally solve this issue :)

Best regards,
Felix

2016-07-08 1:47 GMT+02:00 Felix Neutatz :

> Hi,
>
> i already started to work on this issue. Therefore I created a Jira:
> https://issues.apache.org/jira/browse/FLINK-4175
> I have already implemented a quick version which could solve it. I will
> run the experiments on the cluster first and will describe my approach on
> Monday :)
>
> Have a nice weekend,
> Felix
>
> P.S. for super curious people:
> https://github.com/FelixNeutatz/incubator-flink/commit/7d79d4dfe3f18208a73d6b692b3909f9c69a1da7
>
> 2016-06-09 11:50 GMT+02:00 Felix Neutatz :
>
>> Hi everybody,
>>
>> could we use the org.apache.flink.api.common.cache.DistributedCache to
>> work around this Broadcast issue for the moment, until we fixed it?
>> Or do you think it won't scale either?
>>
>> Best regards,
>> Felix
>>
>> 2016-06-09 10:57 GMT+02:00 Stephan Ewen :
>>
>>> Till is right. Broadcast joins currently materialize once per slot.
>>> Originally, the purely push based runtime was not good enough to handle
>>> it
>>> differently.
>>>
>>> By now, we could definitely handle BC Vars differently (only one slot per
>>> TM requests).
>>> For BC Joins, the hash tables do not coordinate spilling currently, which
>>> means that we cannot do multiple joins through the same hash table.
>>>
>>>
>>> On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann 
>>> wrote:
>>>
>>> > If I'm not mistaken, then broadcast variables and broadcast inputs of
>>> joins
>>> > follow different code paths. Broadcast variables use additional input
>>> > channels and are read before the actual driver code runs. In contrast
>>> to
>>> > that, a join operation is a two input operator where the join driver
>>> > decides how to handle the inputs (which one to read first as build
>>> input).
>>> >
>>> > This also entails that the broadcast variable optimization, where each
>>> task
>>> > manager holds the data only once and copies of the data are discarded
>>> (but
>>> > they are transmitted multiple times to the TM), does not apply to the
>>> > broadcast join inputs. Here you should see an slightly worse
>>> performance
>>> > degradation with your initial benchmark if you increase the number of
>>> > slots.
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
>>> > alexander.s.alexand...@gmail.com> wrote:
>>> >
>>> > > > As far as I know, the reason why the broadcast variables are
>>> > implemented
>>> > > that way is that the senders would have to know which sub-tasks are
>>> > > deployed to which TMs.
>>> > >
>>> > > As the broadcast variables are realized as additionally attached
>>> > "broadcast
>>> > > channels", I am assuming that the same behavior will apply for
>>> broadcast
>>> > > joins as well.
>>> > >
>>> > > Is this the case?
>>> > >
>>> > > Regards,
>>> > > Alexander
>>> > >
>>> > >
>>> > > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas <
>>> andreas.ku...@tu-berlin.de>:
>>> > >
>>> > > > Hi Till,
>>> > > >
>>> > > > thanks for the fast answer.
>>> > > > I'll think about a concrete way of implementing and open an JIRA.
>>> > > >
>>> > > > Best
>>> > > > Andreas
>>> > > > 
>>> > > > Von: Till Rohrmann 
>>> > > > Gesendet: Mittwoch, 8. Juni 2016 15:53
>>> > > > An: dev@flink.apache.org
>>> > > > Betreff: Re: Broadcast data sent increases with # slots per TM
>>> > > >
>>> > > > Hi Andreas,
>>> > > >
>>> > > > your observation

Re: Broadcast data sent increases with # slots per TM

2016-07-07 Thread Felix Neutatz
Hi,

i already started to work on this issue. Therefore I created a Jira:
https://issues.apache.org/jira/browse/FLINK-4175
I have already implemented a quick version which could solve it. I will run
the experiments on the cluster first and will describe my approach on
Monday :)

Have a nice weekend,
Felix

P.S. for super curious people:
https://github.com/FelixNeutatz/incubator-flink/commit/7d79d4dfe3f18208a73d6b692b3909f9c69a1da7

2016-06-09 11:50 GMT+02:00 Felix Neutatz :

> Hi everybody,
>
> could we use the org.apache.flink.api.common.cache.DistributedCache to
> work around this Broadcast issue for the moment, until we fixed it?
> Or do you think it won't scale either?
>
> Best regards,
> Felix
>
> 2016-06-09 10:57 GMT+02:00 Stephan Ewen :
>
>> Till is right. Broadcast joins currently materialize once per slot.
>> Originally, the purely push based runtime was not good enough to handle it
>> differently.
>>
>> By now, we could definitely handle BC Vars differently (only one slot per
>> TM requests).
>> For BC Joins, the hash tables do not coordinate spilling currently, which
>> means that we cannot do multiple joins through the same hash table.
>>
>>
>> On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann 
>> wrote:
>>
>> > If I'm not mistaken, then broadcast variables and broadcast inputs of
>> joins
>> > follow different code paths. Broadcast variables use additional input
>> > channels and are read before the actual driver code runs. In contrast to
>> > that, a join operation is a two input operator where the join driver
>> > decides how to handle the inputs (which one to read first as build
>> input).
>> >
>> > This also entails that the broadcast variable optimization, where each
>> task
>> > manager holds the data only once and copies of the data are discarded
>> (but
>> > they are transmitted multiple times to the TM), does not apply to the
>> > broadcast join inputs. Here you should see an slightly worse performance
>> > degradation with your initial benchmark if you increase the number of
>> > slots.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
>> > alexander.s.alexand...@gmail.com> wrote:
>> >
>> > > > As far as I know, the reason why the broadcast variables are
>> > implemented
>> > > that way is that the senders would have to know which sub-tasks are
>> > > deployed to which TMs.
>> > >
>> > > As the broadcast variables are realized as additionally attached
>> > "broadcast
>> > > channels", I am assuming that the same behavior will apply for
>> broadcast
>> > > joins as well.
>> > >
>> > > Is this the case?
>> > >
>> > > Regards,
>> > > Alexander
>> > >
>> > >
>> > > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas > >:
>> > >
>> > > > Hi Till,
>> > > >
>> > > > thanks for the fast answer.
>> > > > I'll think about a concrete way of implementing and open an JIRA.
>> > > >
>> > > > Best
>> > > > Andreas
>> > > > 
>> > > > Von: Till Rohrmann 
>> > > > Gesendet: Mittwoch, 8. Juni 2016 15:53
>> > > > An: dev@flink.apache.org
>> > > > Betreff: Re: Broadcast data sent increases with # slots per TM
>> > > >
>> > > > Hi Andreas,
>> > > >
>> > > > your observation is correct. The data is sent to each slot and the
>> > > > receiving TM only materializes one copy of the data. The rest of the
>> > data
>> > > > is discarded.
>> > > >
>> > > > As far as I know, the reason why the broadcast variables are
>> > implemented
>> > > > that way is that the senders would have to know which sub-tasks are
>> > > > deployed to which TMs. Only then, you can decide for which sub-tasks
>> > you
>> > > > can send the data together. Since the output emitters are agnostic
>> to
>> > the
>> > > > actual deployment, the necessary information would have to be
>> forwarded
>> > > to
>> > > > them.
>> > > >
>> > > > Another problem is that if you pick one of the sub-tasks to receive
>> the
>> > > > broadcast set

[jira] [Created] (FLINK-4175) Broadcast data sent increases with # slots per TM

2016-07-07 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-4175:


 Summary: Broadcast data sent increases with # slots per TM
 Key: FLINK-4175
 URL: https://issues.apache.org/jira/browse/FLINK-4175
 Project: Flink
  Issue Type: Improvement
  Components: Core, TaskManager
Affects Versions: 1.0.3
Reporter: Felix Neutatz
Assignee: Felix Neutatz


Problem:
we experience some unexpected increase of data sent over the network for 
broadcasts with increasing number of slots per Taskmanager.


We provided a benchmark [1]. It not only increases the size of data sent over 
the network but also hurts performance as seen in the preliminary results 
below. In this results cloud-11 has 25 nodes and ibm-power has 8 nodes with 
scaling the number of slots per node from 1 - 16.


+---+--+-+
| suite | name | median_time |
+===+==+=+
| broadcast.cloud-11| broadcast.01 |8796 |
| broadcast.cloud-11| broadcast.02 |   14802 |
| broadcast.cloud-11| broadcast.04 |   30173 |
| broadcast.cloud-11| broadcast.08 |   56936 |
| broadcast.cloud-11| broadcast.16 |  117507 |
| broadcast.ibm-power-1 | broadcast.01 |6807 |
| broadcast.ibm-power-1 | broadcast.02 |8443 |
| broadcast.ibm-power-1 | broadcast.04 |   11823 |
| broadcast.ibm-power-1 | broadcast.08 |   21655 |
| broadcast.ibm-power-1 | broadcast.16 |   37426 |
+---+--+-+



After looking into the code base it, it seems that the data is de-serialized 
only once per TM, but the actual data is sent for all slots running the 
operator with broadcast vars and just gets discarded in case its already 
de-serialized.


We do not see a reason the data can't be shared among the slots of a TM and 
therefore just sent once.

[1] https://github.com/TU-Berlin-DIMA/flink-broadcast

This Jira will continue the discussion started here: 
https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3c1465386300767.94...@tu-berlin.de%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Broadcast data sent increases with # slots per TM

2016-06-09 Thread Felix Neutatz
Hi everybody,

could we use the org.apache.flink.api.common.cache.DistributedCache to work
around this Broadcast issue for the moment, until we fixed it?
Or do you think it won't scale either?

Best regards,
Felix

2016-06-09 10:57 GMT+02:00 Stephan Ewen :

> Till is right. Broadcast joins currently materialize once per slot.
> Originally, the purely push based runtime was not good enough to handle it
> differently.
>
> By now, we could definitely handle BC Vars differently (only one slot per
> TM requests).
> For BC Joins, the hash tables do not coordinate spilling currently, which
> means that we cannot do multiple joins through the same hash table.
>
>
> On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann 
> wrote:
>
> > If I'm not mistaken, then broadcast variables and broadcast inputs of
> joins
> > follow different code paths. Broadcast variables use additional input
> > channels and are read before the actual driver code runs. In contrast to
> > that, a join operation is a two input operator where the join driver
> > decides how to handle the inputs (which one to read first as build
> input).
> >
> > This also entails that the broadcast variable optimization, where each
> task
> > manager holds the data only once and copies of the data are discarded
> (but
> > they are transmitted multiple times to the TM), does not apply to the
> > broadcast join inputs. Here you should see an slightly worse performance
> > degradation with your initial benchmark if you increase the number of
> > slots.
> >
> > Cheers,
> > Till
> >
> > On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
> > alexander.s.alexand...@gmail.com> wrote:
> >
> > > > As far as I know, the reason why the broadcast variables are
> > implemented
> > > that way is that the senders would have to know which sub-tasks are
> > > deployed to which TMs.
> > >
> > > As the broadcast variables are realized as additionally attached
> > "broadcast
> > > channels", I am assuming that the same behavior will apply for
> broadcast
> > > joins as well.
> > >
> > > Is this the case?
> > >
> > > Regards,
> > > Alexander
> > >
> > >
> > > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas  >:
> > >
> > > > Hi Till,
> > > >
> > > > thanks for the fast answer.
> > > > I'll think about a concrete way of implementing and open an JIRA.
> > > >
> > > > Best
> > > > Andreas
> > > > 
> > > > Von: Till Rohrmann 
> > > > Gesendet: Mittwoch, 8. Juni 2016 15:53
> > > > An: dev@flink.apache.org
> > > > Betreff: Re: Broadcast data sent increases with # slots per TM
> > > >
> > > > Hi Andreas,
> > > >
> > > > your observation is correct. The data is sent to each slot and the
> > > > receiving TM only materializes one copy of the data. The rest of the
> > data
> > > > is discarded.
> > > >
> > > > As far as I know, the reason why the broadcast variables are
> > implemented
> > > > that way is that the senders would have to know which sub-tasks are
> > > > deployed to which TMs. Only then, you can decide for which sub-tasks
> > you
> > > > can send the data together. Since the output emitters are agnostic to
> > the
> > > > actual deployment, the necessary information would have to be
> forwarded
> > > to
> > > > them.
> > > >
> > > > Another problem is that if you pick one of the sub-tasks to receive
> the
> > > > broadcast set, then you have to make sure, that this sub-task has
> read
> > > and
> > > > materialized the broadcast set before the other sub-tasks start
> > working.
> > > > One could maybe send to one sub-task first the broadcast set and then
> > to
> > > > all other sub-tasks, after one has sent the BC set, a kind of
> > acknowledge
> > > > record. That way, the other sub-tasks would block until the broadcast
> > set
> > > > has been completely transmitted. But here one has to make sure that
> the
> > > > sub-task receiving the BC set has been deployed and is not queued up
> > for
> > > > scheduling.
> > > >
> > > > So there are some challenges to solve in order to optimize the BC
> sets.
> > > > Currently, there is nobody working on it. If you want to start
> working
> > on
> > > > it, then I would recommend to open a JIRA and start writing a design
> > > > document for it.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <
> > > andreas.ku...@tu-berlin.de
> > > > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > >
> > > > > we experience some unexpected increase of data sent over the
> network
> > > for
> > > > > broadcasts with increasing number of slots per Taskmanager.
> > > > >
> > > > >
> > > > > We provided a benchmark [1]. It not only increases the size of data
> > > sent
> > > > > over the network but also hurts performance as seen in the
> > preliminary
> > > > > results below. In this results cloud-11 has 25 nodes and ibm-power
> > has
> > > 8
> > > > > nodes with scaling the number of slots per node from 1 - 16.
> > > > >
> > > > >
> > > > > +--

Re: A soft reminder

2015-07-27 Thread Felix Neutatz
Hi,

I also encountered the EOF exception for a delta iteration with "more
data". With less data it works ...

Best regards,
Felix
Am 27.07.2015 10:25 vorm. schrieb "Andra Lungu" :

> Hi Stephan,
>
> I tried to debug a bit around the EOF Exception. It seems that I am pretty
> useless on my own. I have no idea how to fix it. And it occurs in quite a
> lot of experiments... I believe it's something behind a delta iteration
> [this is the common point between my experiments and the others' code].
>
> Some pointers would be greatly appreciated :)
> Thanks!
> Andra
>
> On Mon, Jul 27, 2015 at 9:32 AM, Stephan Ewen  wrote:
>
> > Hey!
> >
> > It is pretty much as you said: No one had the time to fix this so far.
> They
> > are on the list, though,
> >
> > Do you want to pick up one of them?
> >
> > Greetings,
> > Stephan
> >
> >
> > On Fri, Jul 24, 2015 at 6:56 PM, Andra Lungu 
> > wrote:
> >
> > > Hi guys,
> > >
> > > I opened these JIRAs a while ago and, unfortunately, no one had time to
> > > look at them :|.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-2361
> > > [2] https://issues.apache.org/jira/browse/FLINK-2360
> > >
> > > [1] could probably be hacked, but [2] is a huge problem that was
> > > encountered by other people outside of Gelly.
> > >
> > > Thanks for the help!
> > > Andra
> > >
> >
>


serialization issue

2015-07-09 Thread Felix Neutatz
Hi,

I want to use t-digest by Ted Dunning (
https://github.com/tdunning/t-digest/blob/master/src/main/java/com/tdunning/math/stats/ArrayDigest.java)
on Flink.

Locally that works perfectly. But on the cluster I get the following error:

java.lang.Exception: Call to registerInputOutput() of invokable failed
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504)
at java.lang.Thread.run(Thread.java:853)
Caused by: java.lang.RuntimeException: Initializing the output handlers
failed: Could not load deserializer from the configuration.
at
org.apache.flink.runtime.operators.RegularPactTask.registerInputOutput(RegularPactTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:501)
... 1 more
Caused by: java.lang.RuntimeException: Could not load deserializer from the
configuration.
at
org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:83)
at
org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1085)
at
org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:543)
at
org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1267)
at
org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1375)
at
org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1076)
at
org.apache.flink.runtime.operators.RegularPactTask.registerInputOutput(RegularPactTask.java:254)
... 2 more
Caused by: java.io.InvalidClassException: java.lang.Integer; local class
incompatible: stream classdesc serialVersionUID = -8644826526760479189,
local class serialVersion
UID = 1360826667806852920
at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:698)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1705)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1600)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1873)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1432)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
at
org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76)
... 8 more

Is this an issue of Kryo in Flink? Or do I have to implement a custom
serializer?

Thank you for your help,

Felix


Re: Building several models in parallel

2015-07-09 Thread Felix Neutatz
The problem is that I am not able to incorporate this into a Flink
iteration since the MultipleLinearRegression() already contains an
iteration. Therefore this would be a nested iteration ;-)

So at the moment I just use a for loop like this:

data: DataSet[model_ID, DataVector]

for (current_model_ID <- uniqueModel_IDs){

trainingDS = data.filter(t => t.model_ID == current_model_ID).map(t =>
t.DataVector)

val mlr = MultipleLinearRegression()

.setIterations(10)
.setStepsize(stepsize)

mlr.fit(trainingDS)

mlr.predict()

...

}

Is there a more efficient way to do this?

Thank you for your help,

Felix




2015-07-08 10:58 GMT+02:00 Till Rohrmann :

> Yes it is. But you can still run the calculation in parallel because `fit`
> does not trigger the execution of the job graph. It simply builds the data
> flow. Only if you call `predict` or collect the weights, it is executed.
>
> Cheers,
> Till
>
> On Wed, Jul 8, 2015 at 10:52 AM, Felix Neutatz 
> wrote:
>
> > Thanks for the information Till :)
> >
> > So at the moment the iteration is the only way.
> >
> > Best regards,
> > Felix
> >
> > 2015-07-08 10:43 GMT+02:00 Till Rohrmann :
> >
> > > Hi Felix,
> > >
> > > this is currently not supported by FlinkML. The
> MultipleLinearRegression
> > > algorithm expects a DataSet and not a GroupedDataSet as input. What you
> > can
> > > do is to extract each group from the original DataSet by using a filter
> > > operation. Once you have done this, you can train the linear model on
> > each
> > > sub part of the DataSet.
> > >
> > > Cheers,
> > > Till
> > > ​
> > >
> > > On Wed, Jul 8, 2015 at 10:37 AM, Felix Neutatz  >
> > > wrote:
> > >
> > > > Hi Felix,
> > > >
> > > > thanks for the idea. But doesn't this mean that I can only train one
> > > model
> > > > per partition? The thing is, I have way more models than that :(
> > > >
> > > > Best regards,
> > > > Felix
> > > >
> > > > 2015-07-07 22:37 GMT+02:00 Felix Schüler :
> > > >
> > > > > Hi Felix!
> > > > >
> > > > > We had a similar usecase and I trained multiple models on
> partitions
> > of
> > > > > my data with mapPartition and the model-parameters (weights) as
> > > > > broadcast variable. If I understood broadcast variables in Flink
> > > > > correctly, you should end up with one model on each TaskManager.
> > > > >
> > > > > Does that work?
> > > > >
> > > > > Felix
> > > > >
> > > > > Am 07.07.2015 um 17:32 schrieb Felix Neutatz:
> > > > > > Hi,
> > > > > >
> > > > > > at the moment I have a dataset which looks like this:
> > > > > >
> > > > > > DataSet[model_ID, DataVector] data
> > > > > >
> > > > > > So what I want to do is group by the model_ID and build for each
> > > > model_ID
> > > > > > one regression model
> > > > > >
> > > > > > in pseudo code:
> > > > > > data.groupBy(model_ID)
> > > > > > --> MultipleLinearRegression().fit(data_grouped)
> > > > > >
> > > > > > Is there anyway besides an iteration how to do this at the
> moment?
> > > > > >
> > > > > > Thanks for your help,
> > > > > >
> > > > > > Felix Neutatz
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: Building several models in parallel

2015-07-08 Thread Felix Neutatz
Thanks for the information Till :)

So at the moment the iteration is the only way.

Best regards,
Felix

2015-07-08 10:43 GMT+02:00 Till Rohrmann :

> Hi Felix,
>
> this is currently not supported by FlinkML. The MultipleLinearRegression
> algorithm expects a DataSet and not a GroupedDataSet as input. What you can
> do is to extract each group from the original DataSet by using a filter
> operation. Once you have done this, you can train the linear model on each
> sub part of the DataSet.
>
> Cheers,
> Till
> ​
>
> On Wed, Jul 8, 2015 at 10:37 AM, Felix Neutatz 
> wrote:
>
> > Hi Felix,
> >
> > thanks for the idea. But doesn't this mean that I can only train one
> model
> > per partition? The thing is, I have way more models than that :(
> >
> > Best regards,
> > Felix
> >
> > 2015-07-07 22:37 GMT+02:00 Felix Schüler :
> >
> > > Hi Felix!
> > >
> > > We had a similar usecase and I trained multiple models on partitions of
> > > my data with mapPartition and the model-parameters (weights) as
> > > broadcast variable. If I understood broadcast variables in Flink
> > > correctly, you should end up with one model on each TaskManager.
> > >
> > > Does that work?
> > >
> > > Felix
> > >
> > > Am 07.07.2015 um 17:32 schrieb Felix Neutatz:
> > > > Hi,
> > > >
> > > > at the moment I have a dataset which looks like this:
> > > >
> > > > DataSet[model_ID, DataVector] data
> > > >
> > > > So what I want to do is group by the model_ID and build for each
> > model_ID
> > > > one regression model
> > > >
> > > > in pseudo code:
> > > > data.groupBy(model_ID)
> > > > --> MultipleLinearRegression().fit(data_grouped)
> > > >
> > > > Is there anyway besides an iteration how to do this at the moment?
> > > >
> > > > Thanks for your help,
> > > >
> > > > Felix Neutatz
> > > >
> > >
> >
>


Re: Building several models in parallel

2015-07-08 Thread Felix Neutatz
Hi Felix,

thanks for the idea. But doesn't this mean that I can only train one model
per partition? The thing is, I have way more models than that :(

Best regards,
Felix

2015-07-07 22:37 GMT+02:00 Felix Schüler :

> Hi Felix!
>
> We had a similar usecase and I trained multiple models on partitions of
> my data with mapPartition and the model-parameters (weights) as
> broadcast variable. If I understood broadcast variables in Flink
> correctly, you should end up with one model on each TaskManager.
>
> Does that work?
>
> Felix
>
> Am 07.07.2015 um 17:32 schrieb Felix Neutatz:
> > Hi,
> >
> > at the moment I have a dataset which looks like this:
> >
> > DataSet[model_ID, DataVector] data
> >
> > So what I want to do is group by the model_ID and build for each model_ID
> > one regression model
> >
> > in pseudo code:
> > data.groupBy(model_ID)
> > --> MultipleLinearRegression().fit(data_grouped)
> >
> > Is there anyway besides an iteration how to do this at the moment?
> >
> > Thanks for your help,
> >
> > Felix Neutatz
> >
>


Building several models in parallel

2015-07-07 Thread Felix Neutatz
Hi,

at the moment I have a dataset which looks like this:

DataSet[model_ID, DataVector] data

So what I want to do is group by the model_ID and build for each model_ID
one regression model

in pseudo code:
data.groupBy(model_ID)
--> MultipleLinearRegression().fit(data_grouped)

Is there anyway besides an iteration how to do this at the moment?

Thanks for your help,

Felix Neutatz


Re: Read 727 gz files ()

2015-07-07 Thread Felix Neutatz
Yes, that's maybe the problem. The user max is set to 100.000 open files.

2015-07-06 15:55 GMT+02:00 Stephan Ewen :

> 4 mio file handles should be enough ;-)
>
> Is that the system global max, or the user's max? If the user's max us
> lower, this may be the issue...
>
> On Mon, Jul 6, 2015 at 3:50 PM, Felix Neutatz 
> wrote:
>
> > So do you know how to solve this issue apart from increasing the current
> > file-max (4748198)?
> >
> > 2015-07-06 15:35 GMT+02:00 Stephan Ewen :
> >
> > > I think the error is pretty much exactly in the stack trace:
> > >
> > > Caused by: java.io.FileNotFoundException:
> > > /data/4/hadoop/tmp/flink-io-0e2460bf-964b-4883-8eee-12869b9476ab/
> > > 995a38a2c92536383d0057e3482999a9.000329.channel
> > > (Too many open files in system)
> > >
> > >
> > >
> > >
> > > On Mon, Jul 6, 2015 at 3:31 PM, Felix Neutatz 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I want to do some simple aggregations on 727 gz files (68 GB total)
> > from
> > > > HDFS. See code here:
> > > >
> > > >
> > > >
> > >
> >
> https://github.com/FelixNeutatz/wikiTrends/blob/master/extraction/src/main/scala/io/sanfran/wikiTrends/extraction/flink/Stats.scala
> > > >
> > > > We are using a Flink-0.9 SNAPSHOT.
> > > >
> > > > I get the following error:
> > > >
> > > > Caused by: java.lang.Exception: The data preparation for task
> > > > 'Reduce(Reduce at
> > > >
> > > >
> > >
> >
> org.apache.flink.api.scala.GroupedDataSet.reduce(GroupedDataSet.scala:293))'
> > > > , caused an e
> > > > rror: Error obtaining the sorted input: Thread 'SortMerger spilling
> > > thread'
> > > > terminated due to an exception: Channel to path
> > > > '/data/4/hadoop/tmp/flink-io-0e2460bf-964b-488
> > > > 3-8eee-12869b9476ab/995a38a2c92536383d0057e3482999a9.000329.channel'
> > > could
> > > > not be opened.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > > at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > at java.lang.Thread.run(Thread.java:853)
> > > > Caused by: java.lang.RuntimeException: Error obtaining the sorted
> > input:
> > > > Thread 'SortMerger spilling thread' terminated due to an exception:
> > > Channel
> > > > to path '/data/4/hado
> > > >
> > > >
> > >
> >
> op/tmp/flink-io-0e2460bf-964b-4883-8eee-12869b9476ab/995a38a2c92536383d0057e3482999a9.000329.channel'
> > > > could not be opened.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.ReduceDriver.prepare(ReduceDriver.java:93)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
> > > > ... 3 more
> > > > Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> > > > terminated due to an exception: Channel to path
> > > > '/data/4/hadoop/tmp/flink-io-0e2460bf-964b-4883-8eee-1
> > > > 2869b9476ab/995a38a2c92536383d0057e3482999a9.000329.channel' could
> not
> > be
> > > > opened.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784)
> > > > Caused by: java.io.IOException: Channel to path
> > > >
> > > >
> > >
> >
> '/data/4/hadoop

Re: Read 727 gz files ()

2015-07-06 Thread Felix Neutatz
So do you know how to solve this issue apart from increasing the current
file-max (4748198)?

2015-07-06 15:35 GMT+02:00 Stephan Ewen :

> I think the error is pretty much exactly in the stack trace:
>
> Caused by: java.io.FileNotFoundException:
> /data/4/hadoop/tmp/flink-io-0e2460bf-964b-4883-8eee-12869b9476ab/
> 995a38a2c92536383d0057e3482999a9.000329.channel
> (Too many open files in system)
>
>
>
>
> On Mon, Jul 6, 2015 at 3:31 PM, Felix Neutatz 
> wrote:
>
> > Hi,
> >
> > I want to do some simple aggregations on 727 gz files (68 GB total) from
> > HDFS. See code here:
> >
> >
> >
> https://github.com/FelixNeutatz/wikiTrends/blob/master/extraction/src/main/scala/io/sanfran/wikiTrends/extraction/flink/Stats.scala
> >
> > We are using a Flink-0.9 SNAPSHOT.
> >
> > I get the following error:
> >
> > Caused by: java.lang.Exception: The data preparation for task
> > 'Reduce(Reduce at
> >
> >
> org.apache.flink.api.scala.GroupedDataSet.reduce(GroupedDataSet.scala:293))'
> > , caused an e
> > rror: Error obtaining the sorted input: Thread 'SortMerger spilling
> thread'
> > terminated due to an exception: Channel to path
> > '/data/4/hadoop/tmp/flink-io-0e2460bf-964b-488
> > 3-8eee-12869b9476ab/995a38a2c92536383d0057e3482999a9.000329.channel'
> could
> > not be opened.
> > at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> > at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > at java.lang.Thread.run(Thread.java:853)
> > Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> > Thread 'SortMerger spilling thread' terminated due to an exception:
> Channel
> > to path '/data/4/hado
> >
> >
> op/tmp/flink-io-0e2460bf-964b-4883-8eee-12869b9476ab/995a38a2c92536383d0057e3482999a9.000329.channel'
> > could not be opened.
> > at
> >
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)
> > at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
> > at
> >
> >
> org.apache.flink.runtime.operators.ReduceDriver.prepare(ReduceDriver.java:93)
> > at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
> > ... 3 more
> > Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> > terminated due to an exception: Channel to path
> > '/data/4/hadoop/tmp/flink-io-0e2460bf-964b-4883-8eee-1
> > 2869b9476ab/995a38a2c92536383d0057e3482999a9.000329.channel' could not be
> > opened.
> > at
> >
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784)
> > Caused by: java.io.IOException: Channel to path
> >
> >
> '/data/4/hadoop/tmp/flink-io-0e2460bf-964b-4883-8eee-12869b9476ab/995a38a2c92536383d0057e3482999a9.000329.channel'
> > could n
> > ot be opened.
> > at
> >
> >
> org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.(AbstractFileIOChannel.java:61)
> > at
> >
> >
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.(AsynchronousFileIOChannel.java:86)
> > at
> >
> >
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.(AsynchronousBlockWriterWithCallback.java:42)
> > at
> >
> >
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.(AsynchronousBlockWriter.java:44)
> > at
> >
> >
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.createBlockChannelWriter(IOManagerAsync.java:195)
> > at
> >
> >
> org.apache.flink.runtime.io.disk.iomanager.IOManager.createBlockChannelWriter(IOManager.java:218)
> > at
> >
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1318)
> > at
> >
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
> > Caused by: java.io.FileNotFoundException:
> >
> >
> /data/4/hadoop/tmp/flink-io-0e2460bf-964b-4883-8eee-12869b9476ab/995a38a2c92536383d0057e3482999a9.000329.channel
> > (Too many open
> > files in system)
> > at java.io.RandomAccessFile.(RandomAccessFile.java:252)
> > at java.io.RandomAccessFile.(RandomAccessFile.java:133)
> > at
> >
> >
> org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.(AbstractFileIOChannel.java:57)
> > ... 7 more
> >
> > Best regards,
> > Felix
> >
>


Read 727 gz files ()

2015-07-06 Thread Felix Neutatz
Hi,

I want to do some simple aggregations on 727 gz files (68 GB total) from
HDFS. See code here:

https://github.com/FelixNeutatz/wikiTrends/blob/master/extraction/src/main/scala/io/sanfran/wikiTrends/extraction/flink/Stats.scala

We are using a Flink-0.9 SNAPSHOT.

I get the following error:

Caused by: java.lang.Exception: The data preparation for task
'Reduce(Reduce at
org.apache.flink.api.scala.GroupedDataSet.reduce(GroupedDataSet.scala:293))'
, caused an e
rror: Error obtaining the sorted input: Thread 'SortMerger spilling thread'
terminated due to an exception: Channel to path
'/data/4/hadoop/tmp/flink-io-0e2460bf-964b-488
3-8eee-12869b9476ab/995a38a2c92536383d0057e3482999a9.000329.channel' could
not be opened.
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:853)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger spilling thread' terminated due to an exception: Channel
to path '/data/4/hado
op/tmp/flink-io-0e2460bf-964b-4883-8eee-12869b9476ab/995a38a2c92536383d0057e3482999a9.000329.channel'
could not be opened.
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607)
at
org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145)
at
org.apache.flink.runtime.operators.ReduceDriver.prepare(ReduceDriver.java:93)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: Channel to path
'/data/4/hadoop/tmp/flink-io-0e2460bf-964b-4883-8eee-1
2869b9476ab/995a38a2c92536383d0057e3482999a9.000329.channel' could not be
opened.
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784)
Caused by: java.io.IOException: Channel to path
'/data/4/hadoop/tmp/flink-io-0e2460bf-964b-4883-8eee-12869b9476ab/995a38a2c92536383d0057e3482999a9.000329.channel'
could n
ot be opened.
at
org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.(AbstractFileIOChannel.java:61)
at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.(AsynchronousFileIOChannel.java:86)
at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.(AsynchronousBlockWriterWithCallback.java:42)
at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriter.(AsynchronousBlockWriter.java:44)
at
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.createBlockChannelWriter(IOManagerAsync.java:195)
at
org.apache.flink.runtime.io.disk.iomanager.IOManager.createBlockChannelWriter(IOManager.java:218)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1318)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781)
Caused by: java.io.FileNotFoundException:
/data/4/hadoop/tmp/flink-io-0e2460bf-964b-4883-8eee-12869b9476ab/995a38a2c92536383d0057e3482999a9.000329.channel
(Too many open
files in system)
at java.io.RandomAccessFile.(RandomAccessFile.java:252)
at java.io.RandomAccessFile.(RandomAccessFile.java:133)
at
org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.(AbstractFileIOChannel.java:57)
... 7 more

Best regards,
Felix


Hadoopinputformat for numpy und .mat matrices

2015-07-01 Thread Felix Neutatz
Hi everybody,

does anybody know whether there is an implementation of the
hadoopinputformats for matrices in numpy or .mat format? This would be
really needed since a lot of machine learning data is stored in these
formats.

Thanks for your help,

Felix


Documentation not reachable

2015-06-29 Thread Felix Neutatz
Hi,

when I want to access the documentation on the website I get the following
error:

http://ci.apache.org/projects/flink/flink-docs-release-0.9

Service Unavailable

The server is temporarily unable to service your request due to maintenance
downtime or capacity problems. Please try again later.

Is this a known problem?


Best regards,

Felix


Re: Apache Flink 0.9 ALS API

2015-06-13 Thread Felix Neutatz
Hi Ronny,

I agree with you and I would go even further and generalize it overall. So
that the movieID could be of type Long or Int and the userID of type String.

This would increase usability of the ALS implementation :)

Best regards,
Felix

2015-06-10 11:28 GMT+02:00 Ronny Bräunlich :

> Hello everybody,
>
> for a university project we use the current implementation of ALS in Flink
> 0.9 and we were wondering about the API of predict() and fit() requiring a
> DataSet[(Int, Int)] or DataSet[(Int, Int, Double]) respectively, because
> the range of Int is quite limited.
> That is why we wanted to ask you if it wouldn’t be advantageous to change
> Int to Long, to allow more values.
> Please let me know what you think about it.
>
> Cheers,
> Ronny


Re: built problem - flink 0.9-SNAPSHOT

2015-06-11 Thread Felix Neutatz
done: https://issues.apache.org/jira/browse/FLINK-2208

2015-06-12 0:50 GMT+02:00 Ufuk Celebi :

>
> On 12 Jun 2015, at 00:42, Felix Neutatz  wrote:
>
> > Yes, it is on a IBM PowerPC machine. So we change that in the
> documentation
> > to all Java 7,8 ( except IBM Java)?
>
> Yes, you can also open an issue to investiage this further.
>


[jira] [Created] (FLINK-2208) Built error for IBM Java

2015-06-11 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-2208:


 Summary: Built error for IBM Java
 Key: FLINK-2208
 URL: https://issues.apache.org/jira/browse/FLINK-2208
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 0.9
Reporter: Felix Neutatz
Priority: Minor


Using IBM Java 7 will break the built:

{code:xml}
[INFO] --- scala-maven-plugin:3.1.4:compile (scala-compile-first) @ 
flink-runtime ---
[INFO] /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/java:-1: 
info: compiling
[INFO] /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala:-1: 
info: compiling
[INFO] Compiling 461 source files to 
/share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/target/classes at 
1434059956648
[ERROR] 
/share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala:1768:
 error: type OperatingSystemMXBean is not a member of package com.sun.management
[ERROR] asInstanceOf[com.sun.management.OperatingSystemMXBean]).
[ERROR] ^
[ERROR] 
/share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala:1787:
 error: type OperatingSystemMXBean is not a member of package com.sun.management
[ERROR] val methodsList = 
classOf[com.sun.management.OperatingSystemMXBean].getMethods()
[ERROR]  ^
[ERROR] two errors found
[INFO] 
[INFO] Reactor Summary:
[INFO] 
[INFO] flink .. SUCCESS [ 14.447 s]
[INFO] flink-shaded-hadoop  SUCCESS [  2.548 s]
[INFO] flink-shaded-include-yarn .. SUCCESS [ 36.122 s]
[INFO] flink-shaded-include-yarn-tests  SUCCESS [ 36.980 s]
[INFO] flink-core . SUCCESS [ 21.887 s]
[INFO] flink-java . SUCCESS [ 16.023 s]
[INFO] flink-runtime .. FAILURE [ 20.241 s]
[INFO] flink-optimizer  SKIPPED


[hadoop@ibm-power-1 /]$ java -version
java version "1.7.0"
Java(TM) SE Runtime Environment (build pxp6470_27sr1fp1-20140708_01(SR1 FP1))
IBM J9 VM (build 2.7, JRE 1.7.0 Linux ppc64-64 Compressed References 
20140707_205525 (JIT enabled, AOT enabled)
J9VM - R27_Java727_SR1_20140707_1408_B205525
JIT  - tr.r13.java_20140410_61421.07
GC   - R27_Java727_SR1_20140707_1408_B205525_CMPRSS
J9CL - 20140707_205525)
JCL - 20140707_01 based on Oracle 7u65-b16
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: built problem - flink 0.9-SNAPSHOT

2015-06-11 Thread Felix Neutatz
Yes, it is on a IBM PowerPC machine. So we change that in the documentation
to all Java 7,8 ( except IBM Java)?

2015-06-12 0:34 GMT+02:00 Ufuk Celebi :

> This is on an IBM PowerPC machine, right?
>
> Since this MXBeans are from the com.sun.* namespace, I'm not sure if this
> can be fixed w/o loading the MX beans depending on the JVM.
>
> For your JVM, the classes are located in "com.ibm.lang.management.*" and
> not "com.sun.management.*".
>
> On 12 Jun 2015, at 00:21, Felix Neutatz  wrote:
>
> > Hi,
> >
> > the documentation says: "It [the built of the 0.9 snapshot] works well
> with
> > OpenJDK 6 and all Java 7 and 8 compilers."
> >
> > But I got the following error:
> >
> > [INFO] --- scala-maven-plugin:3.1.4:compile (scala-compile-first) @
> > flink-runtime ---
> > [INFO]
> > /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/java:-1:
> > info: compiling
> > [INFO]
> > /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala:-1:
> > info: compiling
> > [INFO] Compiling 461 source files to
> > /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/target/classes at
> > 1434059956648
> > [ERROR]
> >
> /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala:1768:
> > error: type OperatingSystemMXBean is not a member of package
> > com.sun.management
> > [ERROR]
>  asInstanceOf[com.sun.management.OperatingSystemMXBean]).
> > [ERROR] ^
> > [ERROR]
> >
> /share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala:1787:
> > error: type OperatingSystemMXBean is not a member of package
> > com.sun.management
> > [ERROR] val methodsList =
> > classOf[com.sun.management.OperatingSystemMXBean].getMethods()
> > [ERROR]  ^
> > [ERROR] two errors found
> > [INFO]
> > 
> > [INFO] Reactor Summary:
> > [INFO]
> > [INFO] flink .. SUCCESS [
> > 14.447 s]
> > [INFO] flink-shaded-hadoop  SUCCESS [
> > 2.548 s]
> > [INFO] flink-shaded-include-yarn .. SUCCESS [
> > 36.122 s]
> > [INFO] flink-shaded-include-yarn-tests  SUCCESS [
> > 36.980 s]
> > [INFO] flink-core . SUCCESS [
> > 21.887 s]
> > [INFO] flink-java . SUCCESS [
> > 16.023 s]
> > [INFO] flink-runtime .. FAILURE [
> > 20.241 s]
> > [INFO] flink-optimizer  SKIPPED
> >
> >
> > [hadoop@ibm-power-1 /]$ java -version
> > java version "1.7.0"
> > Java(TM) SE Runtime Environment (build pxp6470_27sr1fp1-20140708_01(SR1
> > FP1))
> > IBM J9 VM (build 2.7, JRE 1.7.0 Linux ppc64-64 Compressed References
> > 20140707_205525 (JIT enabled, AOT enabled)
> > J9VM - R27_Java727_SR1_20140707_1408_B205525
> > JIT  - tr.r13.java_20140410_61421.07
> > GC   - R27_Java727_SR1_20140707_1408_B205525_CMPRSS
> > J9CL - 20140707_205525)
> > JCL - 20140707_01 based on Oracle 7u65-b16
> >
> > Best regards,
> > Felix
>
>


built problem - flink 0.9-SNAPSHOT

2015-06-11 Thread Felix Neutatz
Hi,

the documentation says: "It [the built of the 0.9 snapshot] works well with
OpenJDK 6 and all Java 7 and 8 compilers."

But I got the following error:

[INFO] --- scala-maven-plugin:3.1.4:compile (scala-compile-first) @
flink-runtime ---
[INFO]
/share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/java:-1:
info: compiling
[INFO]
/share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala:-1:
info: compiling
[INFO] Compiling 461 source files to
/share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/target/classes at
1434059956648
[ERROR]
/share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala:1768:
error: type OperatingSystemMXBean is not a member of package
com.sun.management
[ERROR] asInstanceOf[com.sun.management.OperatingSystemMXBean]).
[ERROR] ^
[ERROR]
/share/flink/flink-0.9-SNAPSHOT-wo-Yarn/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala:1787:
error: type OperatingSystemMXBean is not a member of package
com.sun.management
[ERROR] val methodsList =
classOf[com.sun.management.OperatingSystemMXBean].getMethods()
[ERROR]  ^
[ERROR] two errors found
[INFO]

[INFO] Reactor Summary:
[INFO]
[INFO] flink .. SUCCESS [
14.447 s]
[INFO] flink-shaded-hadoop  SUCCESS [
 2.548 s]
[INFO] flink-shaded-include-yarn .. SUCCESS [
36.122 s]
[INFO] flink-shaded-include-yarn-tests  SUCCESS [
36.980 s]
[INFO] flink-core . SUCCESS [
21.887 s]
[INFO] flink-java . SUCCESS [
16.023 s]
[INFO] flink-runtime .. FAILURE [
20.241 s]
[INFO] flink-optimizer  SKIPPED


[hadoop@ibm-power-1 /]$ java -version
java version "1.7.0"
Java(TM) SE Runtime Environment (build pxp6470_27sr1fp1-20140708_01(SR1
FP1))
IBM J9 VM (build 2.7, JRE 1.7.0 Linux ppc64-64 Compressed References
20140707_205525 (JIT enabled, AOT enabled)
J9VM - R27_Java727_SR1_20140707_1408_B205525
JIT  - tr.r13.java_20140410_61421.07
GC   - R27_Java727_SR1_20140707_1408_B205525_CMPRSS
J9CL - 20140707_205525)
JCL - 20140707_01 based on Oracle 7u65-b16

Best regards,
Felix


Re: Run scala.App on Cluster

2015-06-10 Thread Felix Neutatz
Thanks :) works like a charm.

2015-06-10 22:28 GMT+02:00 Fabian Hueske :

> Hi,
>
> use ./bin/flink run -c your.MainClass yourJar to specify the Main class.
> Check the documentation of the CLI client for details.
>
> Cheers, Fabian
> On Jun 10, 2015 22:24, "Felix Neutatz"  wrote:
>
> > Hi,
> >
> > I try to run this Scala program:
> >
> >
> https://github.com/FelixNeutatz/wikiTrends/blob/master/extraction/src/main/scala/io/sanfran/wikiTrends/extraction/flink/DownloadTopKPages.scala
> > on a cluster.
> >
> > I tried this command:
> >
> > /share/flink/flink-0.9-SNAPSHOT/bin/flink run
> > /home/neutatz/jars/extraction-1.0-SNAPSHOT.jar
> > io.sanfran.wikiTrends.extraction.flink.DownloadTopKPages
> >
> > org.apache.flink.client.program.ProgramInvocationException: Neither a
> > 'Main-Class', nor a 'program-class' entry was found in the jar file.
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:501)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:134)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:102)
> > at org.apache.flink.client.CliFrontend.buildProgram(CliFrontend.java:654)
> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
> > at
> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
> >
> > Do I have to change my Scala program or can I use another command?
> >
> > Thank you for your help,
> > Felix
> >
>


Run scala.App on Cluster

2015-06-10 Thread Felix Neutatz
Hi,

I try to run this Scala program:
https://github.com/FelixNeutatz/wikiTrends/blob/master/extraction/src/main/scala/io/sanfran/wikiTrends/extraction/flink/DownloadTopKPages.scala
on a cluster.

I tried this command:

/share/flink/flink-0.9-SNAPSHOT/bin/flink run
/home/neutatz/jars/extraction-1.0-SNAPSHOT.jar
io.sanfran.wikiTrends.extraction.flink.DownloadTopKPages

org.apache.flink.client.program.ProgramInvocationException: Neither a
'Main-Class', nor a 'program-class' entry was found in the jar file.
at
org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:501)
at
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:134)
at
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:102)
at org.apache.flink.client.CliFrontend.buildProgram(CliFrontend.java:654)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

Do I have to change my Scala program or can I use another command?

Thank you for your help,
Felix


Re: Problem with ML pipeline

2015-06-08 Thread Felix Neutatz
I am in favor of efficiency. Therefore I would be prefer to introduce new
methods, in order to save memory and network traffic. This would also solve
the problem of "how to come up with ids?"

Best regards,
Felix
Am 08.06.2015 12:52 nachm. schrieb "Sachin Goel" :

> I think if the user doesn't provide IDs, we can safely assume that they
> don't need it. We can just simply assign an ID of one as a temporary
> measure and return the result, with no IDs [just to make the interface
> cleaner].
> If the IDs are provided, in that case, we simply use those IDs.
> A possible template for this would be:
>
> implicit def predictValues[T <: Vector] = {
> new PredictOperation[SVM, T, LabeledVector]{
>   override def predict(
>   instance: SVM,
>   predictParameters: ParameterMap,
>   input: DataSet[T])
> : DataSet[LabeledVector] = {
> predict(ParameterMap,input.map(x=>(1,x))).map(x=> x._2)
> }
> }
> }
>
> implicit def predictValues[T <: (ID,Vector)] = {
> new PredictOperation[SVM, T, (ID,LabeledVector)]{
>   override def predict(
>   instance: SVM,
>   predictParameters: ParameterMap,
>   input: DataSet[T])
> : DataSet[LabeledVector] = {
> predict(ParameterMap,input)
> }
> }
> }
>
> Regards
> Sachin Goel
>
> On Mon, Jun 8, 2015 at 4:11 PM, Till Rohrmann 
> wrote:
>
> > My gut feeling is also that a `Transformer` would be a good place to
> > implement feature selection. Then you can simply reuse it across multiple
> > algorithms by simply chaining them together.
> >
> > However, I don't know yet what's the best way to realize the IDs. One way
> > would be to add an ID field to `Vector` and `LabeledVector`. Another way
> > would be to provide operations for `(ID, Vector)` and `(ID,
> LabeledVector)`
> > tuple types which reuse the implementations for `Vector` and
> > `LabeledVector`. This means that the developer doesn't have to implement
> > special operations for the tuple variants. The latter approach has the
> > advantage that you only use memory for IDs if you really need them.
> >
> > Another question is how to assign the IDs. Does the user have to provide
> > them? Are they randomly chosen. Or do we assign each element an
> increasing
> > index based on the total number of elements?
> >
> > On Mon, Jun 8, 2015 at 12:00 PM Mikio Braun 
> > wrote:
> >
> > > Hi all,
> > >
> > > I think there are number of issues here:
> > >
> > > - whether or not we generally need ids for our examples. For
> > > time-series, this is a must, but I think it would also help us with
> > > many other things (like partitioning the data, or picking a consistent
> > > subset), so I would think adding (numeric) ids in general to
> > > LabeledVector would be ok.
> > > - Some machinery to select features. My biggest concern here for
> > > putting that as a parameter to the learning algorithm is that this
> > > something independent of the learning algorith, so every algorithm
> > > would need to duplicate the code for that. I think it's better if the
> > > learning algorithm can assume that the LabelVector already contains
> > > all the relevant features, and then there should be other operations
> > > to project or extract a subset of examples.
> > >
> > > -M
> > >
> > > On Mon, Jun 8, 2015 at 10:01 AM, Till Rohrmann <
> till.rohrm...@gmail.com>
> > > wrote:
> > > > You're right Felix. You need to provide the `FitOperation` and
> > > > `PredictOperation` for the `Predictor` you want to use and the
> > > > `FitOperation` and `TransformOperation` for all `Transformer`s you
> want
> > > to
> > > > chain in front of the `Predictor`.
> > > >
> > > > Specifying which features to take could be a solution. However, then
> > > you're
> > > > always carrying data along which is not needed. Especially for large
> > > scale
> > > > data, this might be prohibitive expensive. I guess the more efficient
> > > > solution would be to assign an ID and later join with the removed
> > feature
> > > > elements.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Jun 8, 2015 at 7:11 AM Sachin Goel  >
> > > wrote:
> > > >
> > > >> A more general approach would be to take as input which

Re: Problem with ML pipeline

2015-06-07 Thread Felix Neutatz
Probably we also need it for the other classes of the pipeline as well, in
order to be able to pass the ID through the whole pipeline.

Best regards,
Felix
 Am 06.06.2015 9:46 vorm. schrieb "Till Rohrmann" :

> Then you only have to provide an implicit PredictOperation[SVM, (T, Int),
> (LabeledVector, Int)] value with T <: Vector in the scope where you call
> the predict operation.
> On Jun 6, 2015 8:14 AM, "Felix Neutatz"  wrote:
>
> > That would be great. I like the special predict operation better because
> it
> > is only in some cases necessary to return the id. The special predict
> > Operation would save this overhead.
> >
> > Best regards,
> > Felix
> > Am 04.06.2015 7:56 nachm. schrieb "Till Rohrmann" <
> till.rohrm...@gmail.com
> > >:
> >
> > > I see your problem. One way to solve the problem is to implement a
> > special
> > > PredictOperation which takes a tuple (id, vector) and returns a tuple
> > (id,
> > > labeledVector). You can take a look at the implementation for the
> vector
> > > prediction operation.
> > >
> > > But we can also discuss about adding an ID field to the Vector type.
> > >
> > > Cheers,
> > > Till
> > > On Jun 4, 2015 7:30 PM, "Felix Neutatz" 
> wrote:
> > >
> > > > Hi,
> > > >
> > > > I have the following use case: I want to to regression for a
> timeseries
> > > > dataset like:
> > > >
> > > > id, x1, x2, ..., xn, y
> > > >
> > > > id = point in time
> > > > x = features
> > > > y = target value
> > > >
> > > > In the Flink frame work I would map this to a LabeledVector (y,
> > > > DenseVector(x)). (I don't want to use the id as a feature)
> > > >
> > > > When I apply finally the predict() method I get a LabeledVector
> > > > (y_predicted, DenseVector(x)).
> > > >
> > > > Now my problem is that I would like to plot the predicted target
> value
> > > > according to its time.
> > > >
> > > > What I have to do now is:
> > > >
> > > > a = predictedDataSet.map ( LabeledVector => Tuple2(x,y_p))
> > > > b = originalDataSet.map("id, x1, x2, ..., xn, y" => Tuple2(x,id))
> > > >
> > > > a.join(b).where("x").equalTo("x") { (a,b) => (id, y_p)
> > > >
> > > > This is really a cumbersome process for such an simple thing. Is
> there
> > > any
> > > > approach which makes this more simple. If not, can we extend the ML
> > API.
> > > to
> > > > allow ids?
> > > >
> > > > Best regards,
> > > > Felix
> > > >
> > >
> >
>


Re: Problem with ML pipeline

2015-06-05 Thread Felix Neutatz
That would be great. I like the special predict operation better because it
is only in some cases necessary to return the id. The special predict
Operation would save this overhead.

Best regards,
Felix
Am 04.06.2015 7:56 nachm. schrieb "Till Rohrmann" :

> I see your problem. One way to solve the problem is to implement a special
> PredictOperation which takes a tuple (id, vector) and returns a tuple (id,
> labeledVector). You can take a look at the implementation for the vector
> prediction operation.
>
> But we can also discuss about adding an ID field to the Vector type.
>
> Cheers,
> Till
> On Jun 4, 2015 7:30 PM, "Felix Neutatz"  wrote:
>
> > Hi,
> >
> > I have the following use case: I want to to regression for a timeseries
> > dataset like:
> >
> > id, x1, x2, ..., xn, y
> >
> > id = point in time
> > x = features
> > y = target value
> >
> > In the Flink frame work I would map this to a LabeledVector (y,
> > DenseVector(x)). (I don't want to use the id as a feature)
> >
> > When I apply finally the predict() method I get a LabeledVector
> > (y_predicted, DenseVector(x)).
> >
> > Now my problem is that I would like to plot the predicted target value
> > according to its time.
> >
> > What I have to do now is:
> >
> > a = predictedDataSet.map ( LabeledVector => Tuple2(x,y_p))
> > b = originalDataSet.map("id, x1, x2, ..., xn, y" => Tuple2(x,id))
> >
> > a.join(b).where("x").equalTo("x") { (a,b) => (id, y_p)
> >
> > This is really a cumbersome process for such an simple thing. Is there
> any
> > approach which makes this more simple. If not, can we extend the ML API.
> to
> > allow ids?
> >
> > Best regards,
> > Felix
> >
>


Re: ALS implementation

2015-06-05 Thread Felix Neutatz
Shouldn't Flink figure it out on its own, how much memory there is for the
join?

The detailed trace for the Nullpointer exception can be found here:
https://github.com/FelixNeutatz/IMPRO-3.SS15/blob/8b679f1c2808a2c6d6900824409fbd47e8bed826/NullPointerException.txt

Best regards,
Felix

2015-06-04 19:41 GMT+02:00 Till Rohrmann :

> I think it is not a problem of join hints, but rather of too little memory
> for the join operator. If you set the temporary directory, then the job
> will be split in smaller parts and thus each operator gets more memory.
> Alternatively, you can increase the memory you give to the Task Managers.
>
> The problem with the NullPointerException won't be solved by this, though.
> Could you send the full stack trace for that?
>
> Cheers,
> Till
> On Jun 4, 2015 7:10 PM, "Andra Lungu"  wrote:
>
> > Hi Felix,
> >
> > Passing a JoinHint to your function should help.
> > see:
> >
> >
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3ccanc1h_vffbqyyiktzcdpihn09r4he4oluiursjnci_rwc+c...@mail.gmail.com%3E
> >
> > Cheers,
> > Andra
> >
> > On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz 
> > wrote:
> >
> > > after bug fix:
> > >
> > > for 100 blocks and standard jvm heap space
> > >
> > > Caused by: java.lang.RuntimeException: Hash join exceeded maximum
> number
> > of
> > > recursions, without reducing partitions enough to be memory resident.
> > > Probably cause: Too many duplicate keys.
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > > at
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > for 150 blocks and 5G jvm heap space
> > >
> > > Caused by: java.lang.NullPointerException
> > > at
> > >
> > >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > > ...
> > >
> > > Best regards,
> > > Felix
> > >
> > > 2015-06-04 10:19 GMT+02:00 Felix Neutatz :
> > >
> > > > Yes, I will try it again with the newest update :)
> > > >
> > > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann :
> > > >
> > > >> If the first error is not fixed by Chiwans PR, then we should
> create a
> > > >> JIRA
> > > >> for it to not forget it.
> > > >>
> > > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with
> > > this
> > > >> version?
> > > >>
> > > >> Cheers,
> > > >> Till
> > > >>
> > > >> [1] https://github.com/apache/flink/pull/751
> > > >>
> > > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park  >
> > > >> wrote:
> > > >>
> > > >> > Hi. The second bug is fixed by the recent change in PR.
> > > >> > But there is just no test case for first bug.
> > > >> >
> > > >> > Regards,
> > > >> > Chiwan Park
> > > >> >
> > > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi  wrote:
> > > >> > >
> > > >> > > I think both are bugs. They are triggered by the different
> memory
> > > >> > > configurations.
> > > >> > >
> > > >> > > @chiwan: is the 2nd error fixed by your recent change?
> > > >> > >
> &g

Re: ALS implementation

2015-06-04 Thread Felix Neutatz
now the question is, which join in the ALS implementation is the problem :)

2015-06-04 19:09 GMT+02:00 Andra Lungu :

> Hi Felix,
>
> Passing a JoinHint to your function should help.
> see:
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3ccanc1h_vffbqyyiktzcdpihn09r4he4oluiursjnci_rwc+c...@mail.gmail.com%3E
>
> Cheers,
> Andra
>
> On Thu, Jun 4, 2015 at 7:07 PM, Felix Neutatz 
> wrote:
>
> > after bug fix:
> >
> > for 100 blocks and standard jvm heap space
> >
> > Caused by: java.lang.RuntimeException: Hash join exceeded maximum number
> of
> > recursions, without reducing partitions enough to be memory resident.
> > Probably cause: Too many duplicate keys.
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > at
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > at
> >
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > at java.lang.Thread.run(Thread.java:745)
> >
> >
> > for 150 blocks and 5G jvm heap space
> >
> > Caused by: java.lang.NullPointerException
> > at
> >
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > ...
> >
> > Best regards,
> > Felix
> >
> > 2015-06-04 10:19 GMT+02:00 Felix Neutatz :
> >
> > > Yes, I will try it again with the newest update :)
> > >
> > > 2015-06-04 10:17 GMT+02:00 Till Rohrmann :
> > >
> > >> If the first error is not fixed by Chiwans PR, then we should create a
> > >> JIRA
> > >> for it to not forget it.
> > >>
> > >> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with
> > this
> > >> version?
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> [1] https://github.com/apache/flink/pull/751
> > >>
> > >> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park 
> > >> wrote:
> > >>
> > >> > Hi. The second bug is fixed by the recent change in PR.
> > >> > But there is just no test case for first bug.
> > >> >
> > >> > Regards,
> > >> > Chiwan Park
> > >> >
> > >> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi  wrote:
> > >> > >
> > >> > > I think both are bugs. They are triggered by the different memory
> > >> > > configurations.
> > >> > >
> > >> > > @chiwan: is the 2nd error fixed by your recent change?
> > >> > >
> > >> > > @felix: if yes, can you try the 2nd run again with the changes?
> > >> > >
> > >> > > On Thursday, June 4, 2015, Felix Neutatz 
> > >> wrote:
> > >> > >
> > >> > >> Hi,
> > >> > >>
> > >> > >> I played a bit with the ALS recommender algorithm. I used the
> > >> movielens
> > >> > >> dataset:
> > >> > >>
> > http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> > >> > >>
> > >> > >> The rating matrix has 21.063.128 entries (ratings).
> > >> > >>
> > >> > >> I run the algorithm with 3 configurations:
> > >> > >>
> > >> > >> 1. standard jvm heap space:
> > >> > >>
> > >> > >> val als = ALS()
> > >> > >>   .setIterations(10)
> > >> > >>   .setNumFactors(10)
> > >> > >>   .setBlocks(100)
> > >> > >>
> > >> > >> throws:
> > >> > >> java.lang.RuntimeException: Hash Join bug in memory management:
> > >> Memory
>

Problem with ML pipeline

2015-06-04 Thread Felix Neutatz
Hi,

I have the following use case: I want to to regression for a timeseries
dataset like:

id, x1, x2, ..., xn, y

id = point in time
x = features
y = target value

In the Flink frame work I would map this to a LabeledVector (y,
DenseVector(x)). (I don't want to use the id as a feature)

When I apply finally the predict() method I get a LabeledVector
(y_predicted, DenseVector(x)).

Now my problem is that I would like to plot the predicted target value
according to its time.

What I have to do now is:

a = predictedDataSet.map ( LabeledVector => Tuple2(x,y_p))
b = originalDataSet.map("id, x1, x2, ..., xn, y" => Tuple2(x,id))

a.join(b).where("x").equalTo("x") { (a,b) => (id, y_p)

This is really a cumbersome process for such an simple thing. Is there any
approach which makes this more simple. If not, can we extend the ML API. to
allow ids?

Best regards,
Felix


Re: ALS implementation

2015-06-04 Thread Felix Neutatz
after bug fix:

for 100 blocks and standard jvm heap space

Caused by: java.lang.RuntimeException: Hash join exceeded maximum number of
recursions, without reducing partitions enough to be memory resident.
Probably cause: Too many duplicate keys.
at
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:718)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:506)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:543)
at
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)


for 150 blocks and 5G jvm heap space

Caused by: java.lang.NullPointerException
at
org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
...

Best regards,
Felix

2015-06-04 10:19 GMT+02:00 Felix Neutatz :

> Yes, I will try it again with the newest update :)
>
> 2015-06-04 10:17 GMT+02:00 Till Rohrmann :
>
>> If the first error is not fixed by Chiwans PR, then we should create a
>> JIRA
>> for it to not forget it.
>>
>> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with this
>> version?
>>
>> Cheers,
>> Till
>>
>> [1] https://github.com/apache/flink/pull/751
>>
>> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park 
>> wrote:
>>
>> > Hi. The second bug is fixed by the recent change in PR.
>> > But there is just no test case for first bug.
>> >
>> > Regards,
>> > Chiwan Park
>> >
>> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi  wrote:
>> > >
>> > > I think both are bugs. They are triggered by the different memory
>> > > configurations.
>> > >
>> > > @chiwan: is the 2nd error fixed by your recent change?
>> > >
>> > > @felix: if yes, can you try the 2nd run again with the changes?
>> > >
>> > > On Thursday, June 4, 2015, Felix Neutatz 
>> wrote:
>> > >
>> > >> Hi,
>> > >>
>> > >> I played a bit with the ALS recommender algorithm. I used the
>> movielens
>> > >> dataset:
>> > >> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
>> > >>
>> > >> The rating matrix has 21.063.128 entries (ratings).
>> > >>
>> > >> I run the algorithm with 3 configurations:
>> > >>
>> > >> 1. standard jvm heap space:
>> > >>
>> > >> val als = ALS()
>> > >>   .setIterations(10)
>> > >>   .setNumFactors(10)
>> > >>   .setBlocks(100)
>> > >>
>> > >> throws:
>> > >> java.lang.RuntimeException: Hash Join bug in memory management:
>> Memory
>> > >> buffers leaked.
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>> > >> at
>> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>> > >> at
>> > >>
>> > >>
>> >
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> > >> at java.lang.Thread.run(Thread.java:745)
>> > >>
>&g

Re: ALS implementation

2015-06-04 Thread Felix Neutatz
Yes, I will try it again with the newest update :)

2015-06-04 10:17 GMT+02:00 Till Rohrmann :

> If the first error is not fixed by Chiwans PR, then we should create a JIRA
> for it to not forget it.
>
> @Felix: Chiwan's PR is here [1]. Could you try to run ALS again with this
> version?
>
> Cheers,
> Till
>
> [1] https://github.com/apache/flink/pull/751
>
> On Thu, Jun 4, 2015 at 10:10 AM, Chiwan Park 
> wrote:
>
> > Hi. The second bug is fixed by the recent change in PR.
> > But there is just no test case for first bug.
> >
> > Regards,
> > Chiwan Park
> >
> > > On Jun 4, 2015, at 5:09 PM, Ufuk Celebi  wrote:
> > >
> > > I think both are bugs. They are triggered by the different memory
> > > configurations.
> > >
> > > @chiwan: is the 2nd error fixed by your recent change?
> > >
> > > @felix: if yes, can you try the 2nd run again with the changes?
> > >
> > > On Thursday, June 4, 2015, Felix Neutatz 
> wrote:
> > >
> > >> Hi,
> > >>
> > >> I played a bit with the ALS recommender algorithm. I used the
> movielens
> > >> dataset:
> > >> http://files.grouplens.org/datasets/movielens/ml-latest-README.html
> > >>
> > >> The rating matrix has 21.063.128 entries (ratings).
> > >>
> > >> I run the algorithm with 3 configurations:
> > >>
> > >> 1. standard jvm heap space:
> > >>
> > >> val als = ALS()
> > >>   .setIterations(10)
> > >>   .setNumFactors(10)
> > >>   .setBlocks(100)
> > >>
> > >> throws:
> > >> java.lang.RuntimeException: Hash Join bug in memory management: Memory
> > >> buffers leaked.
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > >> at
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > >> at java.lang.Thread.run(Thread.java:745)
> > >>
> > >> 2. 5G jvm heap space
> > >>
> > >> val als = ALS()
> > >>   .setIterations(10)
> > >>   .setNumFactors(10)
> > >>   .setBlocks(150)
> > >>
> > >> throws:
> > >>
> > >> java.lang.NullPointerException
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
> > >> at
> > org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
> > >> at
> > >>
> > >>
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > >> at java.lang.Thread.run(Thread.java:745)
> > >>
> > >> 3. 14G jvm heap space
> > >>
> > >> val als = ALS()
> > >>   .setIterations(10)
> > >>   .setNumFactors(10)
> > >>   .setBlocks(150)
> > >>   .setTemporaryPath("/tmp/tmpALS")
> > >>
> > >> -> works
> > >>
> > >> Is this a Flink problem or is it just my bad configuration?
> > >>
> > >> Best regards,
> > >> Felix
> > >>
> >
> >
> >
> >
> >
> >
>


ALS implementation

2015-06-04 Thread Felix Neutatz
Hi,

I played a bit with the ALS recommender algorithm. I used the movielens
dataset: http://files.grouplens.org/datasets/movielens/ml-latest-README.html

The rating matrix has 21.063.128 entries (ratings).

I run the algorithm with 3 configurations:

1. standard jvm heap space:

val als = ALS()
   .setIterations(10)
   .setNumFactors(10)
   .setBlocks(100)

throws:
java.lang.RuntimeException: Hash Join bug in memory management: Memory
buffers leaked.
at
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:733)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
at
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

2. 5G jvm heap space

val als = ALS()
   .setIterations(10)
   .setNumFactors(10)
   .setBlocks(150)

throws:

java.lang.NullPointerException
at
org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:310)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1090)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:923)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:779)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:541)
at
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:173)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

3. 14G jvm heap space

val als = ALS()
   .setIterations(10)
   .setNumFactors(10)
   .setBlocks(150)
   .setTemporaryPath("/tmp/tmpALS")

-> works

Is this a Flink problem or is it just my bad configuration?

Best regards,
Felix


Re: MultipleLinearRegression - Strange results

2015-06-02 Thread Felix Neutatz
Yes, grid search solved the problem :)

2015-06-02 11:07 GMT+02:00 Till Rohrmann :

> The SGD algorithm adapts the learning rate accordingly. However, this does
> not help if you choose the initial learning rate too large because then you
> calculate a weight vector in the first iterations from which it takes
> really long to recover.
>
> Cheer,
> Till
>
> On Mon, Jun 1, 2015 at 7:15 PM, Sachin Goel 
> wrote:
>
> > You can set the learning rate to be 1/sqrt(iteration number). This
> usually
> > works.
> >
> > Regards
> > Sachin Goel
> >
> > On Mon, Jun 1, 2015 at 9:09 PM, Alexander Alexandrov <
> > alexander.s.alexand...@gmail.com> wrote:
> >
> > > I've seen some work on adaptive learning rates in the past days.
> > >
> > > Maybe we can think about extending the base algorithm and comparing the
> > use
> > > case setting for the IMPRO-3 project.
> > >
> > > @Felix you can discuss this with the others on Wednesday, Manu will be
> > also
> > > there and can give some feedback, I'll try to send a link tomorrow
> > > morning...
> > >
> > >
> > > 2015-06-01 20:33 GMT+10:00 Till Rohrmann :
> > >
> > > > Since MLR uses stochastic gradient descent, you probably have to
> > > configure
> > > > the step size right. SGD is very sensitive to the right step size
> > choice.
> > > > If the step size is too high, then the SGD algorithm does not
> converge.
> > > You
> > > > can find the parameter description here [1].
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> http://ci.apache.org/projects/flink/flink-docs-master/libs/ml/multiple_linear_regression.html
> > > >
> > > > On Mon, Jun 1, 2015 at 11:48 AM, Felix Neutatz <
> neut...@googlemail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I want to use MultipleLinearRegression, but I got really strange
> > > results.
> > > > > So I tested it with the housing price dataset:
> > > > >
> > > > >
> > > >
> > >
> >
> http://archive.ics.uci.edu/ml/machine-learning-databases/housing/housing.data
> > > > >
> > > > > And here I get negative house prices - even when I use the training
> > set
> > > > as
> > > > > dataset:
> > > > > LabeledVector(-1.1901998613214253E78, DenseVector(1500.0, 2197.0,
> > > 2978.0,
> > > > > 1369.0, 1451.0))
> > > > > LabeledVector(-2.7411218018254747E78, DenseVector(4445.0, 4522.0,
> > > 4038.0,
> > > > > 4223.0, 4868.0))
> > > > > LabeledVector(-2.688526857613956E78, DenseVector(4522.0, 4038.0,
> > > 4351.0,
> > > > > 4129.0, 4617.0))
> > > > > LabeledVector(-1.3075960386971714E78, DenseVector(2001.0, 2059.0,
> > > 1992.0,
> > > > > 2008.0, 2504.0))
> > > > > LabeledVector(-1.476238770814297E78, DenseVector(1992.0, 1965.0,
> > > 1983.0,
> > > > > 2300.0, 3811.0))
> > > > > LabeledVector(-1.4298128754759792E78, DenseVector(2059.0, 1992.0,
> > > 1965.0,
> > > > > 2425.0, 3178.0))
> > > > > ...
> > > > >
> > > > > and a huge squared error:
> > > > > Squared error: 4.799184832395361E159
> > > > >
> > > > > You can find my code here:
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/FelixNeutatz/wikiTrends/blob/master/extraction/src/test/io/sanfran/wikiTrends/extraction/flink/Regression.scala
> > > > >
> > > > > Can you help me? What did I do wrong?
> > > > >
> > > > > Thank you for your help,
> > > > > Felix
> > > > >
> > > >
> > >
> >
>


MultipleLinearRegression - Strange results

2015-06-01 Thread Felix Neutatz
Hi,

I want to use MultipleLinearRegression, but I got really strange results.
So I tested it with the housing price dataset:
http://archive.ics.uci.edu/ml/machine-learning-databases/housing/housing.data

And here I get negative house prices - even when I use the training set as
dataset:
LabeledVector(-1.1901998613214253E78, DenseVector(1500.0, 2197.0, 2978.0,
1369.0, 1451.0))
LabeledVector(-2.7411218018254747E78, DenseVector(4445.0, 4522.0, 4038.0,
4223.0, 4868.0))
LabeledVector(-2.688526857613956E78, DenseVector(4522.0, 4038.0, 4351.0,
4129.0, 4617.0))
LabeledVector(-1.3075960386971714E78, DenseVector(2001.0, 2059.0, 1992.0,
2008.0, 2504.0))
LabeledVector(-1.476238770814297E78, DenseVector(1992.0, 1965.0, 1983.0,
2300.0, 3811.0))
LabeledVector(-1.4298128754759792E78, DenseVector(2059.0, 1992.0, 1965.0,
2425.0, 3178.0))
...

and a huge squared error:
Squared error: 4.799184832395361E159

You can find my code here:
https://github.com/FelixNeutatz/wikiTrends/blob/master/extraction/src/test/io/sanfran/wikiTrends/extraction/flink/Regression.scala

Can you help me? What did I do wrong?

Thank you for your help,
Felix


Re: New project website

2015-05-11 Thread Felix Neutatz
Hi Ufuk,

I really like the idea of redesigning the start page. But in my opinion
your page design looks more like a documentation webpage than a starting
page.

In my personal opinion I like the current design better, since you get a
really quick overview with many fancy pictures. (So if you wanna dive in
deep, you can check out things in the documentation or/and wiki).

Moreover I have some design issue. Can you align the structure that there
aren't as many overlaps. Here you can see what I mean:
http://postimg.org/image/paogu6f3h/

This is only my personal opinion, maybe I have a wrong idea about starting
pages.

Best regards,
Felix


2015-05-11 14:06 GMT+02:00 Hermann Gábor :

> Great!
> It looks way better than the current site :)
>
> On Mon, May 11, 2015 at 1:28 PM Stephan Ewen  wrote:
>
> > I think we may have to remove the term "Language Integrated Queries", I
> > think that is trademarked by Microsoft.
> >
> > Otherwise, +1
> >
> > On Mon, May 11, 2015 at 1:19 PM, Maximilian Michels 
> > wrote:
> >
> > > +1 very nice comprehensive overhaul of the website. I'd suggest to
> merge
> > > this as soon as possible. We can incrementally fix the remaining issues
> > and
> > > add a twitter feed to the front page.
> > >
> > > On Mon, May 11, 2015 at 12:09 PM, Ted Dunning 
> > > wrote:
> > >
> > > > If there is an active twitter entity for Flink, I recommend framing
> > that
> > > on
> > > > the home page.
> > > >
> > > >
> > > >
> > > > On Mon, May 11, 2015 at 8:51 AM, Ufuk Celebi  wrote:
> > > >
> > > > > Hey all,
> > > > >
> > > > > I reworked the project website the last couple of days and would
> like
> > > to
> > > > > share the preview:
> > > > >
> > > > > http://uce.github.io/flink-web/
> > > > >
> > > > > I would like to get this in asap. We can push incremental updates
> at
> > > any
> > > > > time, but I think this version is a big improvement over the
> current
> > > > status
> > > > > quo. If I get some +1s I'll go ahead and update the website today.
> > > > >
> > > > > – Ufuk
> > > >
> > >
> >
>


Re: Parquet Article / Tutorial

2015-05-11 Thread Felix Neutatz
Hi Flavio,

changing the block size may help. But I haven't played around with it yet.

Best regards,
Felix
Am 11.05.2015 12:23 nachm. schrieb "Flavio Pompermaier" <
pomperma...@okkam.it>:

> Hi Felix,
> I was looking at your code and I don't see any parquet.block.size settings.
> Do you think it is safe to keep defaults?
>
> On Fri, Apr 24, 2015 at 11:19 PM, Flavio Pompermaier  >
> wrote:
>
> > Thanks Felix,
> > Thanks fir the response!
> > I'm looking forward to use it!
> > On Apr 24, 2015 9:01 PM, "Felix Neutatz"  wrote:
> >
> >> Hi Flavio,
> >>
> >> in Thrift you can try:
> >>
> >> struct FlavioTuple {
> >> 1: optional string f1;
> >> 2: optional string f2;
> >> 3: optional list f3;
> >> }
> >>
> >> See: http://diwakergupta.github.io/thrift-missing-guide/
> >>
> >> I like Thrift the most, because the API for Thrift in Parquet is the
> >> easiest.
> >>
> >> Have fun with Parquet :)
> >>
> >> Best regards,
> >>
> >> Felix
> >>
> >> 2015-04-24 12:28 GMT+02:00 Flavio Pompermaier :
> >>
> >> > I was looking at this great example and I'd like to ask you which
> >> > serialization framework is the best if I have to serialize
> >> > Tuple3 with Parquet.
> >> > The syntax I like the most is the Thrift one but I can't see all the
> >> pros
> >> > and cons of using it and I'd like to hear your opinion here.
> >> >
> >> > Thanks in advance,
> >> > Flavio
> >> >
> >> > On Fri, Apr 10, 2015 at 2:52 PM, Robert Metzger 
> >> > wrote:
> >> >
> >> > > +1 for a blog post
> >> > >
> >> > > On Tue, Apr 7, 2015 at 5:17 PM, Henry Saputra <
> >> henry.sapu...@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > +1 to the idea.
> >> > > >
> >> > > > Awesome work, Felix
> >> > > >
> >> > > > On Tuesday, April 7, 2015, Maximilian Michels 
> >> wrote:
> >> > > >
> >> > > > > Hi Felix,
> >> > > > >
> >> > > > > Very nice informative read.
> >> > > > >
> >> > > > > +1 for a short blog post and a full version in the wiki.
> >> > > > > +1 for putting this into flink-contrib
> >> > > > >
> >> > > > >
> >> > > > > On Tue, Apr 7, 2015 at 1:46 PM, Fabian Hueske <
> fhue...@gmail.com
> >> > > > > > wrote:
> >> > > > >
> >> > > > > > Very nice article!
> >> > > > > > How about adding the full article to the wiki and having a
> >> shorter
> >> > > > > version
> >> > > > > > as a blog post (with a link to the wiki)?
> >> > > > > > Adding the code to contrib would be great!
> >> > > > > >
> >> > > > > > 2015-04-07 12:45 GMT+02:00 Kostas Tzoumas <
> ktzou...@apache.org
> >> > > > > >:
> >> > > > > >
> >> > > > > > > Looks very nice! Would love to see a blog post on that!
> >> > > > > > >
> >> > > > > > > On Mon, Apr 6, 2015 at 7:19 PM, Felix Neutatz <
> >> > > > neut...@googlemail.com
> >> > > > > >
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > The intention was to post it on the blog, but if you think
> >> it
> >> > > would
> >> > > > > > > better
> >> > > > > > > > fit into the wiki, that would be also fine :)
> >> > > > > > > >
> >> > > > > > > > About the code: I have not thought about putting it to
> >> > > > > contrib-package,
> >> > > > > > > but
> >> > > > > > > > I can. What do you think is suitable for flink-contrib?
> >> > > > > > > >
> >> > > > > > > > Best regards,
> >> > > > > > > >
> >> > > > > > > > Felix
> >> > > > > > 

[jira] [Created] (FLINK-1939) Add Parquet Documentation to Wiki

2015-04-26 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-1939:


 Summary: Add Parquet Documentation to Wiki
 Key: FLINK-1939
 URL: https://issues.apache.org/jira/browse/FLINK-1939
 Project: Flink
  Issue Type: Task
  Components: Documentation
Reporter: Felix Neutatz
Assignee: Felix Neutatz
Priority: Trivial


Add documentation who to read and write Parquet :)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Parquet Article / Tutorial

2015-04-24 Thread Felix Neutatz
Hi Flavio,

in Thrift you can try:

struct FlavioTuple {
1: optional string f1;
2: optional string f2;
3: optional list f3;
}

See: http://diwakergupta.github.io/thrift-missing-guide/

I like Thrift the most, because the API for Thrift in Parquet is the
easiest.

Have fun with Parquet :)

Best regards,

Felix

2015-04-24 12:28 GMT+02:00 Flavio Pompermaier :

> I was looking at this great example and I'd like to ask you which
> serialization framework is the best if I have to serialize
> Tuple3 with Parquet.
> The syntax I like the most is the Thrift one but I can't see all the pros
> and cons of using it and I'd like to hear your opinion here.
>
> Thanks in advance,
> Flavio
>
> On Fri, Apr 10, 2015 at 2:52 PM, Robert Metzger 
> wrote:
>
> > +1 for a blog post
> >
> > On Tue, Apr 7, 2015 at 5:17 PM, Henry Saputra 
> > wrote:
> >
> > > +1 to the idea.
> > >
> > > Awesome work, Felix
> > >
> > > On Tuesday, April 7, 2015, Maximilian Michels  wrote:
> > >
> > > > Hi Felix,
> > > >
> > > > Very nice informative read.
> > > >
> > > > +1 for a short blog post and a full version in the wiki.
> > > > +1 for putting this into flink-contrib
> > > >
> > > >
> > > > On Tue, Apr 7, 2015 at 1:46 PM, Fabian Hueske  > > > > wrote:
> > > >
> > > > > Very nice article!
> > > > > How about adding the full article to the wiki and having a shorter
> > > > version
> > > > > as a blog post (with a link to the wiki)?
> > > > > Adding the code to contrib would be great!
> > > > >
> > > > > 2015-04-07 12:45 GMT+02:00 Kostas Tzoumas  > > > >:
> > > > >
> > > > > > Looks very nice! Would love to see a blog post on that!
> > > > > >
> > > > > > On Mon, Apr 6, 2015 at 7:19 PM, Felix Neutatz <
> > > neut...@googlemail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > The intention was to post it on the blog, but if you think it
> > would
> > > > > > better
> > > > > > > fit into the wiki, that would be also fine :)
> > > > > > >
> > > > > > > About the code: I have not thought about putting it to
> > > > contrib-package,
> > > > > > but
> > > > > > > I can. What do you think is suitable for flink-contrib?
> > > > > > >
> > > > > > > Best regards,
> > > > > > >
> > > > > > > Felix
> > > > > > >
> > > > > > > 2015-04-06 14:57 GMT+02:00 Stephan Ewen  > > > >:
> > > > > > >
> > > > > > > > Wow, very nice work!
> > > > > > > >
> > > > > > > > It looks impressive at first sight, no comments :-)
> > > > > > > >
> > > > > > > > Just one question: Do you want this to be a standalone
> > tutorial,
> > > or
> > > > > are
> > > > > > > you
> > > > > > > > thinking of putting this into the Flink wiki, or some code
> even
> > > > into
> > > > > > > > "flink-contrib" ?
> > > > > > > >
> > > > > > > > Stephan
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sun, Apr 5, 2015 at 3:30 PM, Felix Neutatz <
> > > > > neut...@googlemail.com >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi everybody,
> > > > > > > > >
> > > > > > > > > I am working currently on a tutorial/article about
> > how/when/why
> > > > to
> > > > > > use
> > > > > > > > > Parquet on Flink.
> > > > > > > > >
> > > > > > > > > You can find the pdf version here:
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/FelixNeutatz/parquet-flinktacular/blob/master/tutorial/parquet_flinktacular.pdf
> > > > > > > > >
> > > > > > > > > The git repository with all the code examples can be found
> > > here:
> > > > > > > > > https://github.com/FelixNeutatz/parquet-flinktacular/
> > > > > > > > >
> > > > > > > > > What do you think about it? I am happy about every
> feedback I
> > > can
> > > > > get
> > > > > > > :)
> > > > > > > > >
> > > > > > > > > Have a nice Sunday,
> > > > > > > > >
> > > > > > > > > Felix
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: TableAPI - Join on two keys

2015-04-17 Thread Felix Neutatz
I am also against the manual cross method. Isn't it the idea of the table
API to hide the actual implementation from the user?

Best regards,
Felix
Am 17.04.2015 10:09 vorm. schrieb "Till Rohrmann" :

> Why not doing two separate joins, union the results and doing a distinct
> operation on the combined key?
>
> On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek 
> wrote:
>
> > So, the first thing is a "feature" of the Java API that removes
> > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would
> > throw an error because one 0 is removed from the first key.
> >
> > The second thing is a feature of the Table API where the error message
> > is hinting at the problem:
> > Could not derive equi-join predicates for predicate 'nodeID === 'src
> > || 'nodeID === 'target
> >
> > The problem is, that this would have to be executed as a cross
> > followed by a filter because none of the predicates are equi-join
> > predicates that must always be true (because of the OR relation). This
> > I don't want to allow, because a cross can be very expensive. I will
> > add a jira ticket for adding a manual cross operation to the Table
> > API.
> >
> > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz 
> > wrote:
> > > Hi,
> > >
> > > I want to join two tables in the following way:
> > >
> > > case class WeightedEdge(src: Int, target: Int, weight: Double)
> > > case class Community(communityID: Int, nodeID: Int)
> > >
> > > case class CommunitySumTotal(communityID: Int, sumTotal: Double)
> > >
> > > val communities: DataSet[Community]
> > > val weightedEdges: DataSet[WeightedEdge]
> > >
> > > val communitiesTable = communities.toTable
> > > val weightedEdgesTable = weightedEdges.toTable
> > >
> > > val sumTotal = communitiesTable.join(weightedEdgesTable)
> > >  .where("nodeID = src && nodeID = target")
> > >  .groupBy('communityID)
> > >  .select("communityID, weight.sum as
> sumTotal").toSet[CommunitySumTotal]
> > >
> > >
> > > but I get this exception:
> > >
> > > Exception in thread "main"
> > > org.apache.flink.api.common.InvalidProgramException: The types of the
> key
> > > fields do not match: The number of specified keys is different.
> > > at
> > >
> >
> org.apache.flink.api.java.operators.JoinOperator.(JoinOperator.java:96)
> > > at
> > >
> >
> org.apache.flink.api.java.operators.JoinOperator$EquiJoin.(JoinOperator.java:197)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> > > at
> > >
> >
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> > > at
> > >
> >
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> > > Moreover when I use the following where clause:
> > >
> > > .where("nodeID = src || nodeID = target")
> > >
> > > I get another error:
> > >
> > > Exception in thread "main"
> > > org.apache.flink.api.table.ExpressionException: Could not derive
> > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
> > > 'target.
> > >
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> > > at
> > >
> >
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> > > at
> > >
> >
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> > >
> > >
> > > Apart from that the TableApi seems really promising. It's a really
> great
> > tool.
> > >
> > > Thank you for your help,
> > >
> > > Felix
> >
>


TableAPI - Join on two keys

2015-04-16 Thread Felix Neutatz
Hi,

I want to join two tables in the following way:

case class WeightedEdge(src: Int, target: Int, weight: Double)
case class Community(communityID: Int, nodeID: Int)

case class CommunitySumTotal(communityID: Int, sumTotal: Double)

val communities: DataSet[Community]
val weightedEdges: DataSet[WeightedEdge]

val communitiesTable = communities.toTable
val weightedEdgesTable = weightedEdges.toTable

val sumTotal = communitiesTable.join(weightedEdgesTable)
 .where("nodeID = src && nodeID = target")
 .groupBy('communityID)
 .select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal]


but I get this exception:

Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: The types of the key
fields do not match: The number of specified keys is different.
at
org.apache.flink.api.java.operators.JoinOperator.(JoinOperator.java:96)
at
org.apache.flink.api.java.operators.JoinOperator$EquiJoin.(JoinOperator.java:197)
at
org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
at
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
Moreover when I use the following where clause:

.where("nodeID = src || nodeID = target")

I get another error:

Exception in thread "main"
org.apache.flink.api.table.ExpressionException: Could not derive
equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
'target.

at
org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
at
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)


Apart from that the TableApi seems really promising. It's a really great tool.

Thank you for your help,

Felix


[jira] [Created] (FLINK-1899) Table API Bug

2015-04-16 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-1899:


 Summary: Table API Bug
 Key: FLINK-1899
 URL: https://issues.apache.org/jira/browse/FLINK-1899
 Project: Flink
  Issue Type: Bug
  Components: Expression API
Affects Versions: 0.9
Reporter: Felix Neutatz
Priority: Minor


I want to run the following program

{code:scala}
case class WeightedEdge(src: Int, target: Int, weight: Double)
case class Community(communityID: Int, nodeID: Int)

case class CommunitySumTotal(communityID: Int, sumTotal: Double)

val communities: DataSet[Community]
val weightedEdges: DataSet[WeightedEdge]

val communitiesTable = communities.toTable 
val weightedEdgesTable = weightedEdges.toTable

val sumTotal = communitiesTable.join(weightedEdgesTable)
  .where("nodeID = src")
  .groupBy('communityID)
  .select('communityID, 'weight.sum).toSet[CommunitySumTotal]
{code}

but I get this exception. In my opinion the outputs do have the same field 
types.

{code:xml}
Exception in thread "main" org.apache.flink.api.table.ExpressionException: 
Expression result type org.apache.flink.api.table.Row(communityID: Integer, 
intermediate.1: Double) does not have the samefields as output type 
io.ssc.trackthetrackers.analysis.algorithms.CommunitySumTotal(communityID: 
Integer, sumTotal: Double)
at 
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:88)
at 
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at 
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$.detectCommunities(LouvainCommunityDetection.scala:105)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$delayedInit$body.apply(LouvainCommunityDetection.scala:38)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Collect of user defined classes doesn't work

2015-04-14 Thread Felix Neutatz
@Till: Yes, it works without the parentheses :) Thanks :)

2015-04-14 16:52 GMT+02:00 Felix Neutatz :

> I don't know. I can only see the following:
>
> def collect : scala.collection.mutable.Buffer[T] = { /* compiled code */ }
>
> When do they update the latest snapshot on Maven?
>
>
> 2015-04-14 15:52 GMT+02:00 Till Rohrmann :
>
>> Could you check the definition of the collect method in the DataSet.scala
>> file? Does it contain parentheses or not?
>>
>> On Tue, Apr 14, 2015 at 3:48 PM, Felix Neutatz 
>> wrote:
>>
>> > I use the latest maven snapshot:
>> >
>> > 
>> >   org.apache.flink
>> >   flink-scala
>> >   0.9-SNAPSHOT
>> > 
>> > 
>> >   org.apache.flink
>> >   flink-clients
>> >   0.9-SNAPSHOT
>> > 
>> >
>> >
>> > 2015-04-14 15:45 GMT+02:00 Robert Metzger :
>> >
>> > > Hi,
>> > >
>> > > which version of Flink are you using?
>> > >
>> > > On Tue, Apr 14, 2015 at 3:36 PM, Felix Neutatz <
>> neut...@googlemail.com>
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I want to run the following example:
>> > > >
>> > > > import org.apache.flink.api.scala._
>> > > >
>> > > > case class EdgeType(src: Int, target: Int)
>> > > >
>> > > > object Test {
>> > > >def main(args: Array[String]) {
>> > > >   implicit val env =
>> ExecutionEnvironment.getExecutionEnvironment
>> > > >
>> > > >   val graphEdges = readEdges("edges.csv")
>> > > >
>> > > >   graphEdges.collect()
>> > > >}
>> > > >def readEdges(file: String)(implicit env: ExecutionEnvironment)
>> = {
>> > > >   env.readCsvFile[EdgeType](file, "\n", "\t")
>> > > >}
>> > > > }
>> > > >
>> > > > But IntelliJ doesn't compile it and gives me the following
>> explanation:
>> > > >
>> > > >
>> > > > Error:(31, 21) not enough arguments for method apply: (n:
>> Int)EdgeType
>> > > > in trait BufferLike.
>> > > > Unspecified value parameter n.
>> > > > graphEdges.collect()
>> > > >   ^
>> > > >
>> > > > Can anyone help me out here?
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Felix
>> > > >
>> > >
>> >
>>
>
>


Re: Collect of user defined classes doesn't work

2015-04-14 Thread Felix Neutatz
I don't know. I can only see the following:

def collect : scala.collection.mutable.Buffer[T] = { /* compiled code */ }

When do they update the latest snapshot on Maven?


2015-04-14 15:52 GMT+02:00 Till Rohrmann :

> Could you check the definition of the collect method in the DataSet.scala
> file? Does it contain parentheses or not?
>
> On Tue, Apr 14, 2015 at 3:48 PM, Felix Neutatz 
> wrote:
>
> > I use the latest maven snapshot:
> >
> > 
> >   org.apache.flink
> >   flink-scala
> >   0.9-SNAPSHOT
> > 
> > 
> >   org.apache.flink
> >   flink-clients
> >   0.9-SNAPSHOT
> > 
> >
> >
> > 2015-04-14 15:45 GMT+02:00 Robert Metzger :
> >
> > > Hi,
> > >
> > > which version of Flink are you using?
> > >
> > > On Tue, Apr 14, 2015 at 3:36 PM, Felix Neutatz  >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I want to run the following example:
> > > >
> > > > import org.apache.flink.api.scala._
> > > >
> > > > case class EdgeType(src: Int, target: Int)
> > > >
> > > > object Test {
> > > >def main(args: Array[String]) {
> > > >   implicit val env = ExecutionEnvironment.getExecutionEnvironment
> > > >
> > > >   val graphEdges = readEdges("edges.csv")
> > > >
> > > >   graphEdges.collect()
> > > >}
> > > >def readEdges(file: String)(implicit env: ExecutionEnvironment) =
> {
> > > >   env.readCsvFile[EdgeType](file, "\n", "\t")
> > > >}
> > > > }
> > > >
> > > > But IntelliJ doesn't compile it and gives me the following
> explanation:
> > > >
> > > >
> > > > Error:(31, 21) not enough arguments for method apply: (n:
> Int)EdgeType
> > > > in trait BufferLike.
> > > > Unspecified value parameter n.
> > > > graphEdges.collect()
> > > >   ^
> > > >
> > > > Can anyone help me out here?
> > > >
> > > > Thanks,
> > > >
> > > > Felix
> > > >
> > >
> >
>


Re: Collect of user defined classes doesn't work

2015-04-14 Thread Felix Neutatz
I use the latest maven snapshot:


  org.apache.flink
  flink-scala
  0.9-SNAPSHOT


  org.apache.flink
  flink-clients
  0.9-SNAPSHOT



2015-04-14 15:45 GMT+02:00 Robert Metzger :

> Hi,
>
> which version of Flink are you using?
>
> On Tue, Apr 14, 2015 at 3:36 PM, Felix Neutatz 
> wrote:
>
> > Hi,
> >
> > I want to run the following example:
> >
> > import org.apache.flink.api.scala._
> >
> > case class EdgeType(src: Int, target: Int)
> >
> > object Test {
> >def main(args: Array[String]) {
> >   implicit val env = ExecutionEnvironment.getExecutionEnvironment
> >
> >   val graphEdges = readEdges("edges.csv")
> >
> >   graphEdges.collect()
> >}
> >def readEdges(file: String)(implicit env: ExecutionEnvironment) = {
> >   env.readCsvFile[EdgeType](file, "\n", "\t")
> >}
> > }
> >
> > But IntelliJ doesn't compile it and gives me the following explanation:
> >
> >
> > Error:(31, 21) not enough arguments for method apply: (n: Int)EdgeType
> > in trait BufferLike.
> > Unspecified value parameter n.
> > graphEdges.collect()
> >   ^
> >
> > Can anyone help me out here?
> >
> > Thanks,
> >
> > Felix
> >
>


Fwd: Collect of user defined classes doesn't work

2015-04-14 Thread Felix Neutatz
Hi,

I want to run the following example:

import org.apache.flink.api.scala._

case class EdgeType(src: Int, target: Int)

object Test {
   def main(args: Array[String]) {
  implicit val env = ExecutionEnvironment.getExecutionEnvironment

  val graphEdges = readEdges("edges.csv")

  graphEdges.collect()
   }
   def readEdges(file: String)(implicit env: ExecutionEnvironment) = {
  env.readCsvFile[EdgeType](file, "\n", "\t")
   }
}

But IntelliJ doesn't compile it and gives me the following explanation:


Error:(31, 21) not enough arguments for method apply: (n: Int)EdgeType
in trait BufferLike.
Unspecified value parameter n.
graphEdges.collect()
  ^

Can anyone help me out here?

Thanks,

Felix


Re: Parquet Article / Tutorial

2015-04-06 Thread Felix Neutatz
The intention was to post it on the blog, but if you think it would better
fit into the wiki, that would be also fine :)

About the code: I have not thought about putting it to contrib-package, but
I can. What do you think is suitable for flink-contrib?

Best regards,

Felix

2015-04-06 14:57 GMT+02:00 Stephan Ewen :

> Wow, very nice work!
>
> It looks impressive at first sight, no comments :-)
>
> Just one question: Do you want this to be a standalone tutorial, or are you
> thinking of putting this into the Flink wiki, or some code even into
> "flink-contrib" ?
>
> Stephan
>
>
> On Sun, Apr 5, 2015 at 3:30 PM, Felix Neutatz 
> wrote:
>
> > Hi everybody,
> >
> > I am working currently on a tutorial/article about how/when/why to use
> > Parquet on Flink.
> >
> > You can find the pdf version here:
> >
> >
> https://github.com/FelixNeutatz/parquet-flinktacular/blob/master/tutorial/parquet_flinktacular.pdf
> >
> > The git repository with all the code examples can be found here:
> > https://github.com/FelixNeutatz/parquet-flinktacular/
> >
> > What do you think about it? I am happy about every feedback I can get :)
> >
> > Have a nice Sunday,
> >
> > Felix
> >
>


Parquet Article / Tutorial

2015-04-05 Thread Felix Neutatz
Hi everybody,

I am working currently on a tutorial/article about how/when/why to use
Parquet on Flink.

You can find the pdf version here:
https://github.com/FelixNeutatz/parquet-flinktacular/blob/master/tutorial/parquet_flinktacular.pdf

The git repository with all the code examples can be found here:
https://github.com/FelixNeutatz/parquet-flinktacular/

What do you think about it? I am happy about every feedback I can get :)

Have a nice Sunday,

Felix


[jira] [Created] (FLINK-1820) Bug in DoubleParser and FloatParser - empty String is not casted to 0

2015-04-02 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-1820:


 Summary: Bug in DoubleParser and FloatParser - empty String is not 
casted to 0
 Key: FLINK-1820
 URL: https://issues.apache.org/jira/browse/FLINK-1820
 Project: Flink
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1, 0.8.0, 0.9
Reporter: Felix Neutatz
Assignee: Felix Neutatz
Priority: Critical
 Fix For: 0.9


Hi,

I found the bug, when I wanted to read a csv file, which had a line like:
"||\n"

If I treat it as a Tuple2, I get as expected a tuple (0L,0L).

But if I want to read it into a Double-Tuple or a Float-Tuple, I get the 
following error:

java.lang.AssertionError: Test failed due to a 
org.apache.flink.api.common.io.ParseException: Line could not be parsed: '||'
ParserError NUMERIC_VALUE_FORMAT_ERROR 

This error can be solved by adding an additional condition for empty strings in 
the FloatParser / DoubleParser.

We definitly need the CSVReader to be able to read "empty values".

I can fix it like described if there are no better ideas :)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Should collect() and count() be treated as data sinks?

2015-04-02 Thread Felix Neutatz
Hi,

I have run the following program:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

List l = Arrays.asList(new Tuple1(1L));
TypeInformation t = TypeInfoParser.parse("Tuple1");
DataSet> data = env.fromCollection(l, t);

long value = data.count();
System.out.println(value);

env.execute("example");


Since there is no "real" data sink, I get the following:
Exception in thread "main" java.lang.RuntimeException: No data sinks have
been created yet. A program needs at least one sink that consumes data.
Examples are writing the data set or printing it.

In my opinion, we should handle count() and collect() like print().

What do you think?

Best regards,

Felix


Re: Flink GZip support

2015-02-22 Thread Felix Neutatz
Hi Karim,

you can use a Hadoop Input Format and read the files
using flink-hadoop-compatibility classes like here:
http://flink.apache.org/docs/0.7-incubating/hadoop_compatibility.html

Have a nice Sunday,

Felix

2015-02-22 10:02 GMT+01:00 Karim Alaa :

> Hi All,
>
> I’m currently working with Flink 0.8.0 and I would like to know if there
> is or will be any support for handling Gzipped files
>
> Thanks!


Re: [VOTE] Release Apache Flink 0.8.1 (RC0)

2015-02-10 Thread Felix Neutatz
+1

+ Built from source
+ tested some Protobuf examples
+ tested some Parquet examples

Best regards,
Felix



2015-02-10 18:28 GMT+01:00 Márton Balassi :

> +1
>
> Built from source
> Run local examples
> Checked version numbers in the poms
> Validated check sums and signatures
>
> Minor cosmetic thing to note:
>
> The name of flink-parent (Apache Flink) in the maven build is still not
> nice and the inception year is set to 2015 (should be 2014). It seems that
> the commit fixing these only made to the master and not to release-0.8.
>
> Reactor Summary:
> [INFO]
> [INFO] Apache Flink .. SUCCESS [3.917s]
> [INFO] flink-shaded .. SUCCESS [3.135s]
> [INFO] flink-core  SUCCESS
> [22.577s]
> ...
>
> Best,
>
> Marton
>
>
> On Tue, Feb 10, 2015 at 5:31 PM, Robert Metzger 
> wrote:
>
> > Please vote on releasing the following candidate as Apache Flink version
> > 0.8.1
> >
> > This is a bugfix release for 0.8.0.
> >
> > -
> > The commit to be voted on is in the branch "release-0.8.0-rc3"
> > (commit 70943ad5
> > ):
> > *http://git-wip-us.apache.org/repos/asf/flink/commit/70943ad5
> > *
> >
> > The release artifacts to be voted on can be found at:
> > *http://people.apache.org/~rmetzger/flink-0.8.1-rc0/
> > *
> >
> > Release artifacts are signed with the following key:
> > *https://people.apache.org/keys/committer/rmetzger.asc
> > *
> >
> > The staging repository for this release can be found at:
> > *https://repository.apache.org/content/repositories/orgapacheflink-1029
> >  >*
> > -
> >
> >
> > Please vote on releasing this package as Apache Flink 0.8.1.
> >
> > The vote is open for the next 72 hours and passes if a majority of at
> least
> > three +1 PMC votes are cast.
> >
> > [ ] +1 Release this package as Apache Flink 0.8.1
> > [ ] -1 Do not release this package because ...
> >
>


Re: Planning Release 0.8.1

2015-02-10 Thread Felix Neutatz
Yes, that's the case. So this is even better :)

2015-02-10 10:22 GMT+01:00 Robert Metzger :

> @Felix: I think your changes have been backported by Aljoscha's commit to
> the release-0.8 branch:
>
> https://github.com/apache/flink/commit/cd2f88afdad4c9eb2cd141eb3283d2e0084b2527
>
> I would like to merge this to the "release-0.8" branch as well:
> https://github.com/apache/flink/pull/376
>
> After that, all important changes are in the branch. I'm going to create a
> release candidate once #376 is merged.
>
>
> On Mon, Feb 9, 2015 at 3:03 PM, Felix Neutatz 
> wrote:
>
> > Yes, that would be great :)
> >
> > 2015-02-09 14:37 GMT+01:00 Robert Metzger :
> >
> > > Yes.
> > >
> > > Do you mean this? https://github.com/apache/flink/pull/287
> > >
> > > I guess you would like to have this for the Parquet blog post? If so,
> we
> > > can merge it in my opinion.
> > >
> > >
> > > On Mon, Feb 9, 2015 at 2:30 PM, Felix Neutatz 
> > > wrote:
> > >
> > > > @Robert, does this also include the Kryo Protobuff support? If yes we
> > > could
> > > > also ship my changes of the Hadoopinputformat :)
> > > > Am 09.02.2015 14:27 schrieb "Robert Metzger" :
> > > >
> > > > > Cool.
> > > > >
> > > > > I'm currently also testing my last change (kryo serializers). I
> think
> > > > I'll
> > > > > start creating the release candidate in one hour.
> > > > >
> > > > > On Mon, Feb 9, 2015 at 2:24 PM, Márton Balassi <
> > > balassi.mar...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Went through the streaming commits with Gyula and assembled the
> > ones
> > > > > > missing. Pushing it as soon as travis passes.
> > > > > >
> > > > > > On Fri, Feb 6, 2015 at 2:26 PM, Robert Metzger <
> > rmetz...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > It seems that quite a few important fixes still need some work
> > > until
> > > > > they
> > > > > > > are ready.
> > > > > > > I'll extend the "deadline" to Monday morning (CET), since we
> can
> > > not
> > > > > vote
> > > > > > > during the weekends anyways.
> > > > > > >
> > > > > > > On Fri, Feb 6, 2015 at 1:16 PM, Stephan Ewen  >
> > > > wrote:
> > > > > > >
> > > > > > > > I am working on fixes for local split assignment and custom
> > input
> > > > > > split.
> > > > > > > > Would be good to include those as well...
> > > > > > > >
> > > > > > > > On Fri, Feb 6, 2015 at 12:52 PM, Aljoscha Krettek <
> > > > > aljos...@apache.org
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > @robert, yes, will do
> > > > > > > > >
> > > > > > > > > On Fri, Feb 6, 2015 at 12:28 PM, Márton Balassi
> > > > > > > > >  wrote:
> > > > > > > > > > Found a streaming bug, Gyula fixed it. Pushing it soon to
> > > both
> > > > > > master
> > > > > > > > and
> > > > > > > > > > branch-0.8.
> > > > > > > > > >
> > > > > > > > > > On Fri, Feb 6, 2015 at 11:51 AM, Robert Metzger <
> > > > > > rmetz...@apache.org
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> @Aljoscha, can you merge the backported fix to the
> > > > "release-0.8"
> > > > > > > > branch
> > > > > > > > > >> when its ready?
> > > > > > > > > >>
> > > > > > > > > >> On Fri, Feb 6, 2015 at 11:39 AM, Aljoscha Krettek <
> > > > > > > > aljos...@apache.org>
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > >

Re: Planning Release 0.8.1

2015-02-09 Thread Felix Neutatz
Yes, that would be great :)

2015-02-09 14:37 GMT+01:00 Robert Metzger :

> Yes.
>
> Do you mean this? https://github.com/apache/flink/pull/287
>
> I guess you would like to have this for the Parquet blog post? If so, we
> can merge it in my opinion.
>
>
> On Mon, Feb 9, 2015 at 2:30 PM, Felix Neutatz 
> wrote:
>
> > @Robert, does this also include the Kryo Protobuff support? If yes we
> could
> > also ship my changes of the Hadoopinputformat :)
> > Am 09.02.2015 14:27 schrieb "Robert Metzger" :
> >
> > > Cool.
> > >
> > > I'm currently also testing my last change (kryo serializers). I think
> > I'll
> > > start creating the release candidate in one hour.
> > >
> > > On Mon, Feb 9, 2015 at 2:24 PM, Márton Balassi <
> balassi.mar...@gmail.com
> > >
> > > wrote:
> > >
> > > > Went through the streaming commits with Gyula and assembled the ones
> > > > missing. Pushing it as soon as travis passes.
> > > >
> > > > On Fri, Feb 6, 2015 at 2:26 PM, Robert Metzger 
> > > > wrote:
> > > >
> > > > > It seems that quite a few important fixes still need some work
> until
> > > they
> > > > > are ready.
> > > > > I'll extend the "deadline" to Monday morning (CET), since we can
> not
> > > vote
> > > > > during the weekends anyways.
> > > > >
> > > > > On Fri, Feb 6, 2015 at 1:16 PM, Stephan Ewen 
> > wrote:
> > > > >
> > > > > > I am working on fixes for local split assignment and custom input
> > > > split.
> > > > > > Would be good to include those as well...
> > > > > >
> > > > > > On Fri, Feb 6, 2015 at 12:52 PM, Aljoscha Krettek <
> > > aljos...@apache.org
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > @robert, yes, will do
> > > > > > >
> > > > > > > On Fri, Feb 6, 2015 at 12:28 PM, Márton Balassi
> > > > > > >  wrote:
> > > > > > > > Found a streaming bug, Gyula fixed it. Pushing it soon to
> both
> > > > master
> > > > > > and
> > > > > > > > branch-0.8.
> > > > > > > >
> > > > > > > > On Fri, Feb 6, 2015 at 11:51 AM, Robert Metzger <
> > > > rmetz...@apache.org
> > > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > >> @Aljoscha, can you merge the backported fix to the
> > "release-0.8"
> > > > > > branch
> > > > > > > >> when its ready?
> > > > > > > >>
> > > > > > > >> On Fri, Feb 6, 2015 at 11:39 AM, Aljoscha Krettek <
> > > > > > aljos...@apache.org>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >> > I have a fix for this user-discovered bug:
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-1463?jql=project%20%3D%20FLINK%20AND%20assignee%20%3D%20currentUser()%20AND%20resolution%20%3D%20Unresolved
> > > > > > > >> >
> > > > > > > >> > in this PR: https://github.com/apache/flink/pull/353
> > > > > > > >> >
> > > > > > > >> > This should probably also be back-ported to 0.8.1
> > > > > > > >> >
> > > > > > > >> > On Thu, Feb 5, 2015 at 3:54 PM, Aljoscha Krettek <
> > > > > > aljos...@apache.org
> > > > > > > >
> > > > > > > >> > wrote:
> > > > > > > >> > > I have the PR ready. :D
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > On Thu, Feb 5, 2015 at 2:43 PM, Stephan Ewen <
> > > > se...@apache.org>
> > > > > > > wrote:
> > > > > > > >> > >> I would like to add it to 0.8.1. People h

Re: Planning Release 0.8.1

2015-02-09 Thread Felix Neutatz
@Robert, does this also include the Kryo Protobuff support? If yes we could
also ship my changes of the Hadoopinputformat :)
Am 09.02.2015 14:27 schrieb "Robert Metzger" :

> Cool.
>
> I'm currently also testing my last change (kryo serializers). I think I'll
> start creating the release candidate in one hour.
>
> On Mon, Feb 9, 2015 at 2:24 PM, Márton Balassi 
> wrote:
>
> > Went through the streaming commits with Gyula and assembled the ones
> > missing. Pushing it as soon as travis passes.
> >
> > On Fri, Feb 6, 2015 at 2:26 PM, Robert Metzger 
> > wrote:
> >
> > > It seems that quite a few important fixes still need some work until
> they
> > > are ready.
> > > I'll extend the "deadline" to Monday morning (CET), since we can not
> vote
> > > during the weekends anyways.
> > >
> > > On Fri, Feb 6, 2015 at 1:16 PM, Stephan Ewen  wrote:
> > >
> > > > I am working on fixes for local split assignment and custom input
> > split.
> > > > Would be good to include those as well...
> > > >
> > > > On Fri, Feb 6, 2015 at 12:52 PM, Aljoscha Krettek <
> aljos...@apache.org
> > >
> > > > wrote:
> > > >
> > > > > @robert, yes, will do
> > > > >
> > > > > On Fri, Feb 6, 2015 at 12:28 PM, Márton Balassi
> > > > >  wrote:
> > > > > > Found a streaming bug, Gyula fixed it. Pushing it soon to both
> > master
> > > > and
> > > > > > branch-0.8.
> > > > > >
> > > > > > On Fri, Feb 6, 2015 at 11:51 AM, Robert Metzger <
> > rmetz...@apache.org
> > > >
> > > > > wrote:
> > > > > >
> > > > > >> @Aljoscha, can you merge the backported fix to the "release-0.8"
> > > > branch
> > > > > >> when its ready?
> > > > > >>
> > > > > >> On Fri, Feb 6, 2015 at 11:39 AM, Aljoscha Krettek <
> > > > aljos...@apache.org>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > I have a fix for this user-discovered bug:
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-1463?jql=project%20%3D%20FLINK%20AND%20assignee%20%3D%20currentUser()%20AND%20resolution%20%3D%20Unresolved
> > > > > >> >
> > > > > >> > in this PR: https://github.com/apache/flink/pull/353
> > > > > >> >
> > > > > >> > This should probably also be back-ported to 0.8.1
> > > > > >> >
> > > > > >> > On Thu, Feb 5, 2015 at 3:54 PM, Aljoscha Krettek <
> > > > aljos...@apache.org
> > > > > >
> > > > > >> > wrote:
> > > > > >> > > I have the PR ready. :D
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Thu, Feb 5, 2015 at 2:43 PM, Stephan Ewen <
> > se...@apache.org>
> > > > > wrote:
> > > > > >> > >> I would like to add it to 0.8.1. People have asked for
> it...
> > > > > >> > >>
> > > > > >> > >> On Thu, Feb 5, 2015 at 2:36 PM, Aljoscha Krettek <
> > > > > aljos...@apache.org
> > > > > >> >
> > > > > >> > >> wrote:
> > > > > >> > >>
> > > > > >> > >>> Do we want to have HadoopInputFormat support for Scala in
> > > 0.8.1?
> > > > > >> > >>>
> > > > > >> > >>> On Thu, Feb 5, 2015 at 2:29 PM, Stephan Ewen <
> > > se...@apache.org>
> > > > > >> wrote:
> > > > > >> > >>> > I think we need to make a pass through the recent 0.9
> > > commits
> > > > > and
> > > > > >> > cherry
> > > > > >> > >>> > pick some more into 0.8.1. There were quite a few bug
> > fixes.
> > > > > >> > >>> >
> > > > > >> > >>> > Also, this one is rather critical and pending:
> > > > > >> > >>> > https://github.com/apache/flink/pull/318
> > > > > >> > >>> >
> > > > > >> > >>> > On Thu, Feb 5, 2015 at 2:27 PM, Robert Metzger <
> > > > > >> rmetz...@apache.org>
> > > > > >> > >>> wrote:
> > > > > >> > >>> >
> > > > > >> > >>> >> Hi guys,
> > > > > >> > >>> >>
> > > > > >> > >>> >> I would like to bundle a minor bugfix release for Flink
> > > soon.
> > > > > >> > >>> >> Some users were complaining about incomplete Kryo
> > support,
> > > in
> > > > > >> > particular
> > > > > >> > >>> >> for Avro.
> > > > > >> > >>> >>
> > > > > >> > >>> >> Also, we fixed some other issues which are easy to to
> > port
> > > to
> > > > > >> 0.8.1
> > > > > >> > >>> (some
> > > > > >> > >>> >> of them are already in the branch).
> > > > > >> > >>> >>
> > > > > >> > >>> >> I would like to start the vote on the 0.8.1 release in
> > > > roughly
> > > > > 26
> > > > > >> > hours.
> > > > > >> > >>> >> So please merge bugfixes you would like to have in
> 0.8.1
> > > into
> > > > > the
> > > > > >> > >>> >> release-0.8 branch in the next 24 hours.
> > > > > >> > >>> >>
> > > > > >> > >>> >> There are currently 2 open JIRAs assigned to 0.8.1, one
> > > with
> > > > a
> > > > > >> > pending
> > > > > >> > >>> pull
> > > > > >> > >>> >> request:
> > > > > >> > >>> >>
> > > > > >> > >>> >>
> > > > > >> > >>>
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-1422?jql=project%20%3D%20FLINK%20AND%20status%20%3D%20Open%20AND%20fixVersion%20%3D%200.8.1%20ORDER%20BY%20status%20DESC%2C%20priority%20DESC
> > > > > >> > >>> >>
> > > > > >> > >>> >>
> > > > > >> > >>> >> Please chime in if you are suggesting a different plan
> > for
> > > > t

Re: Very strange behaviour of groupBy() -> sort() -> first()

2015-01-21 Thread Felix Neutatz
Thanks, @Fabian, your workaround works :)

But I think this feature is really missing. Shall we add this functionality
natively or via the proposed lib package?

2015-01-21 20:38 GMT+01:00 Fabian Hueske :

> Chesnay is right.
> Right now, it is not possible to do want you want in a straightforward way
> because Flink does not support to fully sort a data set (there are several
> related issues in JIRA).
>
> A workaround would be to attach a constant value to each tuple, group on
> that (all tuples are sent to the same group), sort that group, and apply
> the first operator.
>
> 2015-01-21 20:22 GMT+01:00 Chesnay Schepler  >:
>
> > If i remember correctly first() returns the first n values for every
> > group. the javadocs actually don't make this behaviour very clear.
> >
> >
> > On 21.01.2015 19:18, Felix Neutatz wrote:
> >
> >> Hi,
> >>
> >> my use case is the following:
> >>
> >> I have a Tuple2. I want to group by the String and sum up
> the
> >> Long values accordingly. This works fine with these lines:
> >>
> >> DataSet lineitems = getLineitemDataSet(env);
> >> lineitems.project(new int
> []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
> >> 1);
> >>
> >> After the aggregation I want to print the 10 groups with the highest
> sum,
> >> like:
> >>
> >> string1, 100L
> >> string2, 50L
> >> string3, 1L
> >>
> >> I tried that:
> >>
> >> lineitems.project(new int
> []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
> >> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();
> >>
> >> But instead of 3 records, I get a lot more.
> >>
> >> Can see my error?
> >>
> >> Best regards,
> >>
> >> Felix
> >>
> >>
> >
>


Very strange behaviour of groupBy() -> sort() -> first()

2015-01-21 Thread Felix Neutatz
Hi,

my use case is the following:

I have a Tuple2. I want to group by the String and sum up the
Long values accordingly. This works fine with these lines:

DataSet lineitems = getLineitemDataSet(env);
lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
1);

After the aggregation I want to print the 10 groups with the highest sum,
like:

string1, 100L
string2, 50L
string3, 1L

I tried that:

lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM,
1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print();

But instead of 3 records, I get a lot more.

Can see my error?

Best regards,

Felix


[jira] [Created] (FLINK-1428) Typos in Java code example for RichGroupReduceFunction

2015-01-21 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-1428:


 Summary: Typos in Java code example for RichGroupReduceFunction
 Key: FLINK-1428
 URL: https://issues.apache.org/jira/browse/FLINK-1428
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Felix Neutatz
Priority: Minor


http://flink.apache.org/docs/0.7-incubating/dataset_transformations.html

String key = null //missing ';'

public void combine(Iterable> in,
  Collector> out))
--> one ')' too much



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1412) Update Website Links - Optimizer Plan Visualization Tool

2015-01-18 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-1412:


 Summary: Update Website Links - Optimizer Plan Visualization Tool
 Key: FLINK-1412
 URL: https://issues.apache.org/jira/browse/FLINK-1412
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Felix Neutatz
Priority: Minor


http://flink.apache.org/news/2014/01/26/optimizer_plan_visualization_tool.html

All links point to dead Stratosphere urls



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


How to use org.apache.hadoop.mapreduce.lib.input.MultipleInputs in Flink

2015-01-17 Thread Felix Neutatz
Hi,

is there any example which shows how I can load several files with
different Hadoop input formats at once? My use case is that I want to load
two tables (in Parquet format) via Hadoop and join them within Flink.

Best regards,

Felix


[jira] [Created] (FLINK-1398) A new DataSet function: extractElementFromTuple

2015-01-13 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-1398:


 Summary: A new DataSet function: extractElementFromTuple
 Key: FLINK-1398
 URL: https://issues.apache.org/jira/browse/FLINK-1398
 Project: Flink
  Issue Type: Wish
Reporter: Felix Neutatz
Priority: Minor


This is the use case:

{code:xml}
DataSet> data =  env.fromElements(new 
Tuple2(1,2.0));

data.map(new ElementFromTuple());

}


public static final class ElementFromTuple implements 
MapFunction, Double> {

@Override
public Double map(Tuple2 value) {
return value.f1;

}
}
{code}

It would be awesome if we had something like this:

{code:xml}
data.extractElement(1);
{code}

This means that we implement a function for DataSet which extracts a certain 
element from a given Tuple.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1393) Serializing Protobuf - issue 2

2015-01-12 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-1393:


 Summary: Serializing Protobuf - issue 2
 Key: FLINK-1393
 URL: https://issues.apache.org/jira/browse/FLINK-1393
 Project: Flink
  Issue Type: Bug
Reporter: Felix Neutatz
Priority: Minor


Additionally to [FLINK-1392|https://issues.apache.org/jira/browse/FLINK-1392] I 
also tried another Protobuf class, which throws a different exception:

{code:xml}
Exception in thread "main" java.lang.Exception: Deserializing the InputFormat 
([(null,url: "www.test.de"
archiveTime: 123
scripts: "script.de"
scripts: "script1.de"
iframes: "iframe.de"
iframes: "iframe1.de"
links: "link.de"
links: "link1.de"
images: "img.de"
images: "img1.de"
)]) failed: unread block data
at 
org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:34)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: unread block data
at 
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236)
at 
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:281)
at 
org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
... 25 more
{code}

The program which causes this exception can be found here: 
[https://github.com/FelixNeutatz/incubator-flink/blob/ParquetAtFlink/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/ParquetProtobufOutput2.java]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1392) Serializing Protobuf - issue 1

2015-01-12 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-1392:


 Summary: Serializing Protobuf - issue 1
 Key: FLINK-1392
 URL: https://issues.apache.org/jira/browse/FLINK-1392
 Project: Flink
  Issue Type: Bug
Reporter: Felix Neutatz
Priority: Minor


Hi, I started to experiment with Parquet using Protobuf.

When I use the standard Protobuf class: 
com.twitter.data.proto.tutorial.AddressBookProtos

The code which I run, can be found here: 
[https://github.com/FelixNeutatz/incubator-flink/blob/ParquetAtFlink/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/ParquetProtobufOutput.java]

I get the following exception:

{code:xml}
Exception in thread "main" java.lang.Exception: Deserializing the 
InputFormat (org.apache.flink.api.java.io.CollectionInputFormat) failed: Could 
not read the user code wrapper: Error while deserializing element from 
collection
at 
org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:179)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1$$anonfun$applyOrElse$5.apply(JobManager.scala:172)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:172)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:34)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:27)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:27)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:52)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: 
org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could 
not read the user code wrapper: Error while deserializing element from 
collection
at 
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:285)
at 
org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
... 25 more
Caused by: java.io.IOException: Error while deserializing element from 
collection
at 
org.apache.flink.api.java.io.CollectionInputFormat.readObject(CollectionInputFormat.java:108)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

[jira] [Created] (FLINK-1382) Void is not added to TypeInfoParser

2015-01-09 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-1382:


 Summary: Void is not added to TypeInfoParser
 Key: FLINK-1382
 URL: https://issues.apache.org/jira/browse/FLINK-1382
 Project: Flink
  Issue Type: Bug
Reporter: Felix Neutatz
Priority: Minor


List l = Arrays.asList(new Tuple2(null, 1L));
TypeInformation t = TypeInfoParser.parse("Tuple2");
DataSet> data = env.fromCollection(l, t);
data.print();
Throws:
Exception in thread "main" java.lang.IllegalArgumentException: String could not 
be parsed: Class 'Void' could not be found for use as custom object. Please 
note that inner classes must be declared static.
at 
org.apache.flink.api.java.typeutils.TypeInfoParser.parse(TypeInfoParser.java:90)
at 
org.apache.flink.hadoopcompatibility.mapreduce.example.ParquetOutput.main(ParquetOutput.java:92)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.lang.IllegalArgumentException: Class 'Void' could not be found 
for use as custom object. Please note that inner classes must be declared 
static.
at 
org.apache.flink.api.java.typeutils.TypeInfoParser.parse(TypeInfoParser.java:290)
at 
org.apache.flink.api.java.typeutils.TypeInfoParser.parse(TypeInfoParser.java:133)
at 
org.apache.flink.api.java.typeutils.TypeInfoParser.parse(TypeInfoParser.java:88)
... 6 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)