Re: Efficiency for Filter then Transform ( filter().map() vs flatMap() )

2015-09-04 Thread Stephan Ewen
We will definitely also try to get the chaining overhead down a bit.

BTW: To reach this kind of throughput, you need sources that can produce
very fast...

On Fri, Sep 4, 2015 at 12:20 AM, Welly Tambunan  wrote:

> Hi Stephan,
>
> That's good information to know. We will hit that throughput easily. Our
> computation graph has lot of chaining like this right now.
> I think it's safe to minimize the chain right now.
>
> Thanks a lot for this Stephan.
>
> Cheers
>
> On Thu, Sep 3, 2015 at 7:20 PM, Stephan Ewen  wrote:
>
>> In a set of benchmarks a while back, we found that the chaining mechanism
>> has some overhead right now, because of its abstraction. The abstraction
>> creates iterators for each element and makes it hard for the JIT to
>> specialize on the operators in the chain.
>>
>> For purely local chains at full speed, this overhead is observable (can
>> decrease throughput from 25mio elements/core to 15-20mio elements per
>> core). If your job does not reach that throughput, or is I/O bound, source
>> bound, etc, it does not matter.
>>
>> If you care about super high performance, collapsing the code into one
>> function helps.
>>
>> On Thu, Sep 3, 2015 at 5:59 AM, Welly Tambunan  wrote:
>>
>>> Hi Gyula,
>>>
>>> Thanks for your response. Seems i will use filter and map for now as
>>> that one is really make the intention clear, and not a big performance hit.
>>>
>>> Thanks again.
>>>
>>> Cheers
>>>
>>> On Thu, Sep 3, 2015 at 10:29 AM, Gyula Fóra 
>>> wrote:
>>>
 Hey Welly,

 If you call filter and map one after the other like you mentioned,
 these operators will be chained and executed as if they were running in the
 same operator.
 The only small performance overhead comes from the fact that the output
 of the filter will be copied before passing it as input to the map to keep
 immutability guarantees (but no serialization/deserialization will happen).
 Copying might be practically free depending on your data type, though.

 If you are using operators that don't make use of the immutability of
 inputs/outputs (i.e you don't hold references to those values) than you can
 disable copying altogether by calling env.getConfig().enableObjectReuse(),
 in which case they will have exactly the same performance.

 Cheers,
 Gyula

 Welly Tambunan  ezt írta (időpont: 2015. szept. 3.,
 Cs, 4:33):

> Hi All,
>
> I would like to filter some item from the event stream. I think there
> are two ways doing this.
>
> Using the regular pipeline filter(...).map(...). We can also use
> flatMap for doing both in the same operator.
>
> Any performance improvement if we are using flatMap ? As that will be
> done in one operator instance.
>
>
> Cheers
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>

>>>
>>>
>>> --
>>> Welly Tambunan
>>> Triplelands
>>>
>>> http://weltam.wordpress.com
>>> http://www.triplelands.com 
>>>
>>
>>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>


Re: Usage of Hadoop 2.2.0

2015-09-04 Thread Stephan Ewen
I am good with that as well. Mind that we are not only dropping a binary
distribution for Hadoop 2.2.0, but also the source compatibility with 2.2.0.



Lets also reconfigure Travis to test

 - Hadoop1
 - Hadoop 2.3
 - Hadoop 2.4
 - Hadoop 2.6
 - Hadoop 2.7


On Fri, Sep 4, 2015 at 6:19 AM, Chiwan Park  wrote:

> +1 for dropping Hadoop 2.2.0
>
> Regards,
> Chiwan Park
>
> > On Sep 4, 2015, at 5:58 AM, Ufuk Celebi  wrote:
> >
> > +1 to what Robert said.
> >
> > On Thursday, September 3, 2015, Robert Metzger 
> wrote:
> > I think most cloud providers moved beyond Hadoop 2.2.0.
> > Google's Click-To-Deploy is on 2.4.1
> > AWS EMR is on 2.6.0
> >
> > The situation for the distributions seems to be the following:
> > MapR 4 uses Hadoop 2.4.0 (current is MapR 5)
> > CDH 5.0 uses 2.3.0 (the current CDH release is 5.4)
> >
> > HDP 2.0  (October 2013) is using 2.2.0
> > HDP 2.1 (April 2014) uses 2.4.0 already
> >
> > So both vendors and cloud providers are multiple releases away from
> Hadoop 2.2.0.
> >
> > Spark does not offer a binary distribution lower than 2.3.0.
> >
> > In addition to that, I don't think that the HDFS client in 2.2.0 is
> really usable in production environments. Users were reporting
> ArrayIndexOutOfBounds exceptions for some jobs, I also had these exceptions
> sometimes.
> >
> > The easiest approach  to resolve this issue would be  (a) dropping the
> support for Hadoop 2.2.0
> > An alternative approach (b) would be:
> >  - ship a binary version for Hadoop 2.3.0
> >  - make the source of Flink still compatible with 2.2.0, so that users
> can compile a Hadoop 2.2.0 version if needed.
> >
> > I would vote for approach (a).
> >
> >
> > On Tue, Sep 1, 2015 at 5:01 PM, Till Rohrmann 
> wrote:
> > While working on high availability (HA) for Flink's YARN execution I
> stumbled across some limitations with Hadoop 2.2.0. From version 2.2.0 to
> 2.3.0, Hadoop introduced new functionality which is required for an
> efficient HA implementation. Therefore, I was wondering whether there is
> actually a need to support Hadoop 2.2.0. Is Hadoop 2.2.0 still actively
> used by someone?
> >
> > Cheers,
> > Till
> >
>
>
>
>
>
>


Re: Efficiency for Filter then Transform ( filter().map() vs flatMap() )

2015-09-04 Thread Welly Tambunan
Hi Stephan,

Cheers

On Fri, Sep 4, 2015 at 2:31 PM, Stephan Ewen  wrote:

> We will definitely also try to get the chaining overhead down a bit.
>
> BTW: To reach this kind of throughput, you need sources that can produce
> very fast...
>
> On Fri, Sep 4, 2015 at 12:20 AM, Welly Tambunan  wrote:
>
>> Hi Stephan,
>>
>> That's good information to know. We will hit that throughput easily. Our
>> computation graph has lot of chaining like this right now.
>> I think it's safe to minimize the chain right now.
>>
>> Thanks a lot for this Stephan.
>>
>> Cheers
>>
>> On Thu, Sep 3, 2015 at 7:20 PM, Stephan Ewen  wrote:
>>
>>> In a set of benchmarks a while back, we found that the chaining
>>> mechanism has some overhead right now, because of its abstraction. The
>>> abstraction creates iterators for each element and makes it hard for the
>>> JIT to specialize on the operators in the chain.
>>>
>>> For purely local chains at full speed, this overhead is observable (can
>>> decrease throughput from 25mio elements/core to 15-20mio elements per
>>> core). If your job does not reach that throughput, or is I/O bound, source
>>> bound, etc, it does not matter.
>>>
>>> If you care about super high performance, collapsing the code into one
>>> function helps.
>>>
>>> On Thu, Sep 3, 2015 at 5:59 AM, Welly Tambunan 
>>> wrote:
>>>
 Hi Gyula,

 Thanks for your response. Seems i will use filter and map for now as
 that one is really make the intention clear, and not a big performance hit.

 Thanks again.

 Cheers

 On Thu, Sep 3, 2015 at 10:29 AM, Gyula Fóra 
 wrote:

> Hey Welly,
>
> If you call filter and map one after the other like you mentioned,
> these operators will be chained and executed as if they were running in 
> the
> same operator.
> The only small performance overhead comes from the fact that the
> output of the filter will be copied before passing it as input to the map
> to keep immutability guarantees (but no serialization/deserialization will
> happen). Copying might be practically free depending on your data type,
> though.
>
> If you are using operators that don't make use of the immutability of
> inputs/outputs (i.e you don't hold references to those values) than you 
> can
> disable copying altogether by calling env.getConfig().enableObjectReuse(),
> in which case they will have exactly the same performance.
>
> Cheers,
> Gyula
>
> Welly Tambunan  ezt írta (időpont: 2015. szept.
> 3., Cs, 4:33):
>
>> Hi All,
>>
>> I would like to filter some item from the event stream. I think there
>> are two ways doing this.
>>
>> Using the regular pipeline filter(...).map(...). We can also use
>> flatMap for doing both in the same operator.
>>
>> Any performance improvement if we are using flatMap ? As that will be
>> done in one operator instance.
>>
>>
>> Cheers
>>
>>
>> --
>> Welly Tambunan
>> Triplelands
>>
>> http://weltam.wordpress.com
>> http://www.triplelands.com 
>>
>


 --
 Welly Tambunan
 Triplelands

 http://weltam.wordpress.com
 http://www.triplelands.com 

>>>
>>>
>>
>>
>> --
>> Welly Tambunan
>> Triplelands
>>
>> http://weltam.wordpress.com
>> http://www.triplelands.com 
>>
>
>


-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Efficiency for Filter then Transform ( filter().map() vs flatMap() )

2015-09-04 Thread Welly Tambunan
Hi Stephan,

Thanks for your clarification.

Basically we will have lots of sensor that will push this kind of data to
queuing system ( currently we are using RabbitMQ, but will soon move to
Kafka).
We also will use the same pipeline to process the historical data.

I also want to minimize the chaining as in the filter is doing very little
work. By minimizing the pipeline we can minimize db/external source hit and
cached local data efficiently.

Cheers

On Fri, Sep 4, 2015 at 2:58 PM, Welly Tambunan  wrote:

> Hi Stephan,
>
> Cheers
>
> On Fri, Sep 4, 2015 at 2:31 PM, Stephan Ewen  wrote:
>
>> We will definitely also try to get the chaining overhead down a bit.
>>
>> BTW: To reach this kind of throughput, you need sources that can produce
>> very fast...
>>
>> On Fri, Sep 4, 2015 at 12:20 AM, Welly Tambunan 
>> wrote:
>>
>>> Hi Stephan,
>>>
>>> That's good information to know. We will hit that throughput easily. Our
>>> computation graph has lot of chaining like this right now.
>>> I think it's safe to minimize the chain right now.
>>>
>>> Thanks a lot for this Stephan.
>>>
>>> Cheers
>>>
>>> On Thu, Sep 3, 2015 at 7:20 PM, Stephan Ewen  wrote:
>>>
 In a set of benchmarks a while back, we found that the chaining
 mechanism has some overhead right now, because of its abstraction. The
 abstraction creates iterators for each element and makes it hard for the
 JIT to specialize on the operators in the chain.

 For purely local chains at full speed, this overhead is observable (can
 decrease throughput from 25mio elements/core to 15-20mio elements per
 core). If your job does not reach that throughput, or is I/O bound, source
 bound, etc, it does not matter.

 If you care about super high performance, collapsing the code into one
 function helps.

 On Thu, Sep 3, 2015 at 5:59 AM, Welly Tambunan 
 wrote:

> Hi Gyula,
>
> Thanks for your response. Seems i will use filter and map for now as
> that one is really make the intention clear, and not a big performance 
> hit.
>
> Thanks again.
>
> Cheers
>
> On Thu, Sep 3, 2015 at 10:29 AM, Gyula Fóra 
> wrote:
>
>> Hey Welly,
>>
>> If you call filter and map one after the other like you mentioned,
>> these operators will be chained and executed as if they were running in 
>> the
>> same operator.
>> The only small performance overhead comes from the fact that the
>> output of the filter will be copied before passing it as input to the map
>> to keep immutability guarantees (but no serialization/deserialization 
>> will
>> happen). Copying might be practically free depending on your data type,
>> though.
>>
>> If you are using operators that don't make use of the immutability of
>> inputs/outputs (i.e you don't hold references to those values) than you 
>> can
>> disable copying altogether by calling 
>> env.getConfig().enableObjectReuse(),
>> in which case they will have exactly the same performance.
>>
>> Cheers,
>> Gyula
>>
>> Welly Tambunan  ezt írta (időpont: 2015. szept.
>> 3., Cs, 4:33):
>>
>>> Hi All,
>>>
>>> I would like to filter some item from the event stream. I think
>>> there are two ways doing this.
>>>
>>> Using the regular pipeline filter(...).map(...). We can also use
>>> flatMap for doing both in the same operator.
>>>
>>> Any performance improvement if we are using flatMap ? As that will
>>> be done in one operator instance.
>>>
>>>
>>> Cheers
>>>
>>>
>>> --
>>> Welly Tambunan
>>> Triplelands
>>>
>>> http://weltam.wordpress.com
>>> http://www.triplelands.com 
>>>
>>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>


>>>
>>>
>>> --
>>> Welly Tambunan
>>> Triplelands
>>>
>>> http://weltam.wordpress.com
>>> http://www.triplelands.com 
>>>
>>
>>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>



-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Question on flink and hdfs

2015-09-04 Thread Welly Tambunan
Hi Jerry,

yes, that's possible. You can download the appropriate version
https://flink.apache.org/downloads.html
[image: Inline image 1]

Cheers

On Fri, Sep 4, 2015 at 1:57 AM, Jerry Peng 
wrote:

> Hello,
>
> Does flink require hdfs to run? I know you can use hdfs to checkpoint and
> process files in a distributed fashion.  So can flink run standalone
> without hdfs?
>



-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Usage of Hadoop 2.2.0

2015-09-04 Thread Maximilian Michels
+1 for dropping Hadoop 2.2.0 binary and source-compatibility. The
release is hardly used and complicates the important high-availability
changes in Flink.

On Fri, Sep 4, 2015 at 9:33 AM, Stephan Ewen  wrote:
> I am good with that as well. Mind that we are not only dropping a binary
> distribution for Hadoop 2.2.0, but also the source compatibility with 2.2.0.
>
>
>
> Lets also reconfigure Travis to test
>
>  - Hadoop1
>  - Hadoop 2.3
>  - Hadoop 2.4
>  - Hadoop 2.6
>  - Hadoop 2.7
>
>
> On Fri, Sep 4, 2015 at 6:19 AM, Chiwan Park  wrote:
>>
>> +1 for dropping Hadoop 2.2.0
>>
>> Regards,
>> Chiwan Park
>>
>> > On Sep 4, 2015, at 5:58 AM, Ufuk Celebi  wrote:
>> >
>> > +1 to what Robert said.
>> >
>> > On Thursday, September 3, 2015, Robert Metzger 
>> > wrote:
>> > I think most cloud providers moved beyond Hadoop 2.2.0.
>> > Google's Click-To-Deploy is on 2.4.1
>> > AWS EMR is on 2.6.0
>> >
>> > The situation for the distributions seems to be the following:
>> > MapR 4 uses Hadoop 2.4.0 (current is MapR 5)
>> > CDH 5.0 uses 2.3.0 (the current CDH release is 5.4)
>> >
>> > HDP 2.0  (October 2013) is using 2.2.0
>> > HDP 2.1 (April 2014) uses 2.4.0 already
>> >
>> > So both vendors and cloud providers are multiple releases away from
>> > Hadoop 2.2.0.
>> >
>> > Spark does not offer a binary distribution lower than 2.3.0.
>> >
>> > In addition to that, I don't think that the HDFS client in 2.2.0 is
>> > really usable in production environments. Users were reporting
>> > ArrayIndexOutOfBounds exceptions for some jobs, I also had these exceptions
>> > sometimes.
>> >
>> > The easiest approach  to resolve this issue would be  (a) dropping the
>> > support for Hadoop 2.2.0
>> > An alternative approach (b) would be:
>> >  - ship a binary version for Hadoop 2.3.0
>> >  - make the source of Flink still compatible with 2.2.0, so that users
>> > can compile a Hadoop 2.2.0 version if needed.
>> >
>> > I would vote for approach (a).
>> >
>> >
>> > On Tue, Sep 1, 2015 at 5:01 PM, Till Rohrmann 
>> > wrote:
>> > While working on high availability (HA) for Flink's YARN execution I
>> > stumbled across some limitations with Hadoop 2.2.0. From version 2.2.0 to
>> > 2.3.0, Hadoop introduced new functionality which is required for an
>> > efficient HA implementation. Therefore, I was wondering whether there is
>> > actually a need to support Hadoop 2.2.0. Is Hadoop 2.2.0 still actively 
>> > used
>> > by someone?
>> >
>> > Cheers,
>> > Till
>> >
>>
>>
>>
>>
>>
>


Re: Question on flink and hdfs

2015-09-04 Thread Maximilian Michels
Hi Jerry,

If you don't want to use Hadoop, simply pick _any_ Flink version. We
recommend the Hadoop 1 version because it contains the least dependencies,
i.e. you need to download less and the installation occupies less space.
Other than that, it doesn't really matter if you don't use the HDFS
functionality, i.e. you don't access hdfs:// paths.

Cheers,
Max

On Fri, Sep 4, 2015 at 10:08 AM, Welly Tambunan  wrote:

> Hi Jerry,
>
> yes, that's possible. You can download the appropriate version
> https://flink.apache.org/downloads.html
> [image: Inline image 1]
>
> Cheers
>
> On Fri, Sep 4, 2015 at 1:57 AM, Jerry Peng 
> wrote:
>
>> Hello,
>>
>> Does flink require hdfs to run? I know you can use hdfs to checkpoint and
>> process files in a distributed fashion.  So can flink run standalone
>> without hdfs?
>>
>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>


How to create a stream of data batches

2015-09-04 Thread Andres R. Masegosa
Hi,

I'm trying to code some machine learning algorithms on top of flink such
as a variational Bayes learning algorithms. Instead of working at a data
element level (i.e. using map transformations), it would be far more
efficient to work at a "batch of elements" levels (i.e. I get a batch of
elements and I produce some output).

I could code that using "mapPartition" function. But I can not control
the size of the partition, isn't?

Is there any way to transform a stream (or DataSet) of elements in a
stream (or DataSet) of data batches with the same size?


Thanks for your support,
Andres


Re: Usage of Hadoop 2.2.0

2015-09-04 Thread Matthias J. Sax
+1 for dropping

On 09/04/2015 11:04 AM, Maximilian Michels wrote:
> +1 for dropping Hadoop 2.2.0 binary and source-compatibility. The
> release is hardly used and complicates the important high-availability
> changes in Flink.
> 
> On Fri, Sep 4, 2015 at 9:33 AM, Stephan Ewen  wrote:
>> I am good with that as well. Mind that we are not only dropping a binary
>> distribution for Hadoop 2.2.0, but also the source compatibility with 2.2.0.
>>
>>
>>
>> Lets also reconfigure Travis to test
>>
>>  - Hadoop1
>>  - Hadoop 2.3
>>  - Hadoop 2.4
>>  - Hadoop 2.6
>>  - Hadoop 2.7
>>
>>
>> On Fri, Sep 4, 2015 at 6:19 AM, Chiwan Park  wrote:
>>>
>>> +1 for dropping Hadoop 2.2.0
>>>
>>> Regards,
>>> Chiwan Park
>>>
 On Sep 4, 2015, at 5:58 AM, Ufuk Celebi  wrote:

 +1 to what Robert said.

 On Thursday, September 3, 2015, Robert Metzger 
 wrote:
 I think most cloud providers moved beyond Hadoop 2.2.0.
 Google's Click-To-Deploy is on 2.4.1
 AWS EMR is on 2.6.0

 The situation for the distributions seems to be the following:
 MapR 4 uses Hadoop 2.4.0 (current is MapR 5)
 CDH 5.0 uses 2.3.0 (the current CDH release is 5.4)

 HDP 2.0  (October 2013) is using 2.2.0
 HDP 2.1 (April 2014) uses 2.4.0 already

 So both vendors and cloud providers are multiple releases away from
 Hadoop 2.2.0.

 Spark does not offer a binary distribution lower than 2.3.0.

 In addition to that, I don't think that the HDFS client in 2.2.0 is
 really usable in production environments. Users were reporting
 ArrayIndexOutOfBounds exceptions for some jobs, I also had these exceptions
 sometimes.

 The easiest approach  to resolve this issue would be  (a) dropping the
 support for Hadoop 2.2.0
 An alternative approach (b) would be:
  - ship a binary version for Hadoop 2.3.0
  - make the source of Flink still compatible with 2.2.0, so that users
 can compile a Hadoop 2.2.0 version if needed.

 I would vote for approach (a).


 On Tue, Sep 1, 2015 at 5:01 PM, Till Rohrmann 
 wrote:
 While working on high availability (HA) for Flink's YARN execution I
 stumbled across some limitations with Hadoop 2.2.0. From version 2.2.0 to
 2.3.0, Hadoop introduced new functionality which is required for an
 efficient HA implementation. Therefore, I was wondering whether there is
 actually a need to support Hadoop 2.2.0. Is Hadoop 2.2.0 still actively 
 used
 by someone?

 Cheers,
 Till

>>>
>>>
>>>
>>>
>>>
>>



signature.asc
Description: OpenPGP digital signature


Re: How to create a stream of data batches

2015-09-04 Thread Matthias J. Sax
Hi Andres,

you could do this by using your own data type, for example

> public class MyBatch {
>   private ArrayList data = new ArrayList
> }

In the DataSource, you need to specify your own InputFormat that reads
multiple tuples into a batch and emits the whole batch at once.

However, be aware, that this POJO type hides the batch nature from
Flink, ie, Flink does not know anything about the tuples in the batch.
To Flink a batch is a single tuple. If you want to perform key-based
operations, this might become a problem.

-Matthias

On 09/04/2015 01:00 PM, Andres R. Masegosa  wrote:
> Hi,
> 
> I'm trying to code some machine learning algorithms on top of flink such
> as a variational Bayes learning algorithms. Instead of working at a data
> element level (i.e. using map transformations), it would be far more
> efficient to work at a "batch of elements" levels (i.e. I get a batch of
> elements and I produce some output).
> 
> I could code that using "mapPartition" function. But I can not control
> the size of the partition, isn't?
> 
> Is there any way to transform a stream (or DataSet) of elements in a
> stream (or DataSet) of data batches with the same size?
> 
> 
> Thanks for your support,
> Andres
> 



signature.asc
Description: OpenPGP digital signature


Re: How to create a stream of data batches

2015-09-04 Thread Stephan Ewen
Interesting question, you are the second to ask that.

Batching in user code is a way, as Matthias said. We have on the roadmap a
way to transform a stream to a set of batches, but it will be a bit until
this is in. See
https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams

What kind of operation do you want to do on the batch? Will the batched
communicate (repartition, group, join), or will they only work
partition-local?

On Fri, Sep 4, 2015 at 1:12 PM, Matthias J. Sax  wrote:

> Hi Andres,
>
> you could do this by using your own data type, for example
>
> > public class MyBatch {
> >   private ArrayList data = new ArrayList
> > }
>
> In the DataSource, you need to specify your own InputFormat that reads
> multiple tuples into a batch and emits the whole batch at once.
>
> However, be aware, that this POJO type hides the batch nature from
> Flink, ie, Flink does not know anything about the tuples in the batch.
> To Flink a batch is a single tuple. If you want to perform key-based
> operations, this might become a problem.
>
> -Matthias
>
> On 09/04/2015 01:00 PM, Andres R. Masegosa  wrote:
> > Hi,
> >
> > I'm trying to code some machine learning algorithms on top of flink such
> > as a variational Bayes learning algorithms. Instead of working at a data
> > element level (i.e. using map transformations), it would be far more
> > efficient to work at a "batch of elements" levels (i.e. I get a batch of
> > elements and I produce some output).
> >
> > I could code that using "mapPartition" function. But I can not control
> > the size of the partition, isn't?
> >
> > Is there any way to transform a stream (or DataSet) of elements in a
> > stream (or DataSet) of data batches with the same size?
> >
> >
> > Thanks for your support,
> > Andres
> >
>
>


Convergence Criterion in IterativeDataSet

2015-09-04 Thread Andres R. Masegosa
Hi,


I trying to implement some machine learning algorithms that involve
several iterations until convergence (to a fixed point).

My idea is to use a IterativeDataSet with an Aggregator which produces
the result (i.e. a set of parameters defining the model).

>From the interface "ConvergenceCriterion", I can understand that the
convergence criterion only depends on the result of the aggregator in
the current iteration (as happens with the DoubleZeroConvergence class).

However, it is more usual to test convergence by comparing the result of
the aggregator in the current iteration with the result of the
aggregator in the previous iteration (one usually stops when both
results are similar enough and we have converged to a fixed point).

I guess this functionality is not included yet. And this is because the
convergence criteria of flink implementations of K-Means and Linear
Regression is to stop after a fixed number of iterations.


Am I wrong?


Regards
Andres


Re: Convergence Criterion in IterativeDataSet

2015-09-04 Thread Stephan Ewen
I think you can do this with the current interface. The convergence
criterion object stays around, so you should be able to simply store the
current aggregator value in a field (when the check is invoked). Any round
but the first could compare against that field.

On Fri, Sep 4, 2015 at 2:25 PM, Andres R. Masegosa  wrote:

> Hi,
>
>
> I trying to implement some machine learning algorithms that involve
> several iterations until convergence (to a fixed point).
>
> My idea is to use a IterativeDataSet with an Aggregator which produces
> the result (i.e. a set of parameters defining the model).
>
> From the interface "ConvergenceCriterion", I can understand that the
> convergence criterion only depends on the result of the aggregator in
> the current iteration (as happens with the DoubleZeroConvergence class).
>
> However, it is more usual to test convergence by comparing the result of
> the aggregator in the current iteration with the result of the
> aggregator in the previous iteration (one usually stops when both
> results are similar enough and we have converged to a fixed point).
>
> I guess this functionality is not included yet. And this is because the
> convergence criteria of flink implementations of K-Means and Linear
> Regression is to stop after a fixed number of iterations.
>
>
> Am I wrong?
>
>
> Regards
> Andres
>


Re: Convergence Criterion in IterativeDataSet

2015-09-04 Thread Sachin Goel
Hi Andres
Does something like this solve what you're trying to achieve?
https://github.com/apache/flink/pull/918/files

Regards
Sachin
On Sep 4, 2015 6:24 PM, "Stephan Ewen"  wrote:

> I think you can do this with the current interface. The convergence
> criterion object stays around, so you should be able to simply store the
> current aggregator value in a field (when the check is invoked). Any round
> but the first could compare against that field.
>
> On Fri, Sep 4, 2015 at 2:25 PM, Andres R. Masegosa 
> wrote:
>
>> Hi,
>>
>>
>> I trying to implement some machine learning algorithms that involve
>> several iterations until convergence (to a fixed point).
>>
>> My idea is to use a IterativeDataSet with an Aggregator which produces
>> the result (i.e. a set of parameters defining the model).
>>
>> From the interface "ConvergenceCriterion", I can understand that the
>> convergence criterion only depends on the result of the aggregator in
>> the current iteration (as happens with the DoubleZeroConvergence class).
>>
>> However, it is more usual to test convergence by comparing the result of
>> the aggregator in the current iteration with the result of the
>> aggregator in the previous iteration (one usually stops when both
>> results are similar enough and we have converged to a fixed point).
>>
>> I guess this functionality is not included yet. And this is because the
>> convergence criteria of flink implementations of K-Means and Linear
>> Regression is to stop after a fixed number of iterations.
>>
>>
>> Am I wrong?
>>
>>
>> Regards
>> Andres
>>
>
>


BloomFilter Exception

2015-09-04 Thread Flavio Pompermaier
Hi to all,
running a job with Flink 0/10-SNAPSHOT I got the following Exception:

java.lang.IllegalArgumentException: expectedEntries should be > 0
at
org.apache.flink.shaded.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at
org.apache.flink.runtime.operators.util.BloomFilter.(BloomFilter.java:53)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.initBloomFilter(MutableHashTable.java:711)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.initTable(MutableHashTable.java:1073)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:778)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:530)
at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:563)
at
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
at
org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:208)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)

Is it a bug?

Best,
Flavio


Re: Flink to ingest from Kafka to HDFS?

2015-09-04 Thread Aljoscha Krettek
Hi,
I have an open Pull Request for a RollingFile sink. It is integrated with
checkpointing, so it can provide exactly-once behavior. If you're
interested, please check it out: https://github.com/apache/flink/pull/1084

Cheers,
Aljoscha

On Wed, 26 Aug 2015 at 10:31 Stephan Ewen  wrote:

> BTW: We are currently working on adding a rolling-file HDFS sink to Flink
> that will initially work very similar as what flume gives you. If I
> understand it correctly, Flume may have duplicates in the output from
> incomplete flushes on failures.
>
> We have actually a design to extend this later to a proper "exactly once"
> sink, integrated into Flink's checkpointing, which discards duplicates
> properly by offset tracking and truncating/compacting.
>
>
> On Wed, Aug 26, 2015 at 10:04 AM, Hans-Peter Zorn 
> wrote:
>
>> Hi Stephan,
>>
>> even though I started the discussion, I was just trying to estimate the
>> effort. In that project they finally opted to use flume with a Kafka
>> channel.
>>
>> Best, Hans-Peter
>>
>> On Wed, Aug 26, 2015 at 9:52 AM, LINZ, Arnaud 
>> wrote:
>>
>>> Hi Stephen,
>>>
>>>
>>>
>>> I do not have a Kafka->HDFS solution, but I do have a streaming sink
>>> that writes to HDFS (external, text hive table) with auto-partitioning and
>>> rolling files. However, it does not take care of checkpointing and may have
>>> flushing issues if some partitions are seldom seen.
>>>
>>>
>>>
>>> I’m not sure it will save you much time, especially given the fact that
>>> it has not been really used yet.
>>>
>>>
>>>
>>> Code is provided with no copyright and no warranty!
>>>
>>>
>>>
>>> *import* java.io.BufferedOutputStream;
>>>
>>> *import* java.io.IOException;
>>>
>>> *import* java.util.HashMap;
>>>
>>> *import* java.util.Map;
>>>
>>>
>>>
>>> *import* org.apache.commons.io.IOUtils;
>>>
>>> *import* org.apache.flink.api.java.tuple.Tuple2;
>>>
>>> *import* org.apache.flink.configuration.Configuration;
>>>
>>> *import* org.apache.flink.core.fs.FileSystem;
>>>
>>> *import* org.apache.flink.core.fs.Path;
>>>
>>> *import* org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
>>>
>>> *import* org.apache.hive.hcatalog.data.DefaultHCatRecord;
>>>
>>> *import* org.apache.hive.hcatalog.data.schema.HCatSchema;
>>>
>>> *import* org.joda.time.DateTime;
>>>
>>>
>>>
>>> /**
>>>
>>> * This sink streams data to a HDFS directory (hive external table) with
>>> a size limit (rolling files) and automatic
>>>
>>> * partitioning. To be able to read the file content while it’s still
>>> being written, an idea is to add a char(1) field in the last
>>>
>>> * position of the hive line and to check if it has the proper value when
>>> read (if not, the line is not complete)
>>>
>>> *
>>>
>>>  * *@author* alinz
>>>
>>> */
>>>
>>> *public* *class* HiveStreamOutput *extends*
>>> RichSinkFunction> {
>>>
>>>
>>>
>>> /**
>>>
>>>  * The Class StreamingFile, encapsulates an open output hdfs file
>>>
>>>  */
>>>
>>> *public* *static* *class* StreamingFile {
>>>
>>>
>>>
>>> /** base directory*/
>>>
>>> *private* *final* String rootPath;
>>>
>>> /** prefix*/
>>>
>>> *private* *final* String prefix;
>>>
>>>
>>>
>>> /** file path*/
>>>
>>> *private* Path path;
>>>
>>>
>>>
>>> /** open output stream */
>>>
>>> *private* BufferedOutputStream stream;
>>>
>>>
>>>
>>> /** current size */
>>>
>>> *private* *long* size = 0;
>>>
>>>
>>>
>>> /** current file number*/
>>>
>>> *private* *long* nbFile = 0;
>>>
>>>
>>>
>>> /** instant of the last writing on this stream. If the interval
>>> is too long, flushes content*/
>>>
>>> *private* *long* lastInvoke;
>>>
>>>
>>>
>>> /**
>>>
>>>  * Instantiates a new streaming file.
>>>
>>>  * *@param* rootPath destination path
>>>
>>>  * *@param* prefix file name prefix
>>>
>>>  * *@throws* IOException cannot open file
>>>
>>>  */
>>>
>>> *public* StreamingFile(String rootPath, String prefix) *throws*
>>> IOException {
>>>
>>> *super*();
>>>
>>> *this*.rootPath = rootPath;
>>>
>>> *this*.prefix = prefix;
>>>
>>> lastInvoke = 0; // always flushes first record
>>>
>>> open();
>>>
>>> }
>>>
>>>
>>>
>>> /**
>>>
>>>  * Create destination file on FS
>>>
>>>  * *@throws* IOException issue when opening file
>>>
>>>  */
>>>
>>> *private* *void* open() *throws* IOException {
>>>
>>> *this*.path = *new* Path(rootPath, prefix + nbFile);
>>>
>>> *final* FileSystem filesys = path.getFileSystem();
>>>
>>> filesys.mkdirs(path.getParent());
>>>
>>> stream = *new* BufferedOutputStream(filesys.create(path,
>>> *true*));
>>>
>>> }
>>>
>>>
>>>
>>> /**
>>>
>>>  * closes stream
>>>
>>>  */
>>>
>>> *public* *void* closeStream() {
>>>
>>> IOUtils.*closeQui

Flink join with external source

2015-09-04 Thread Jerry Peng
Hello,

Does a flink currently support operators to use redis?  If I using the
streaming api in Flink and I need to look up something in a redis database
during the processing of the stream how can I do that?


Fwd: Memory management issue

2015-09-04 Thread Ricarda Schueler


Hi All,

We're running into a memory management issue when using the
iterateWithTermination function.
Using a small amount of data, everything works perfectly fine. However,
as soon as the main memory is filled up on a worker, nothing seems to be
happening any more. Once this happens, any worker whose memory is full
will have its CPU workload drop to a minimum (<5%), while maintaining a
full memory with no apparent garbage collection happening and thus the
memory remaining full. All Tasks within this iteration are set to
started, yet none of them actually do anything measurable.
While runs with slightly less data (so that all intermediate results
barely fit into main memory) finished within minutes, runs where the
data would no longer fit would run for days with no results in sight.
When using fewer workers or even running the algorithm locally, this
issue already appears when using less data, which the larger cluster
(with more combined memory) could still handle.

Our code can be found at [1].

Best regards
Ricarda

[1]: https://github.com/DBDA15/graph-mining/tree/master/graph-mining-flink





Using Flink with Redis question

2015-09-04 Thread Jerry Peng
Hello,

So I am trying to use jedis (redis java client) with Flink streaming api,
but I get an exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)

at org.apache.flink.client.program.Client.run(Client.java:278)

at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631)

at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)

at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954)

at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004)

Caused by: org.apache.flink.api.common.InvalidProgramException: Object
flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not
serializable

at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)

at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320)

at
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144)

at
org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624)

at
flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:483)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)

... 6 more

Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306)

at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)

... 16 more





so my code I am using:


public static class RedisJoinBolt implements
FlatMapFunction
, Tuple6> {
 private Jedis jedis;
 private HashMap ad_to_campaign;

 public RedisJoinBolt(String jedisServer) {
  //initialize jedis
  this.jedis = new Jedis(jedisServer);
 }

 @Override
 public void flatMap(Tuple5 input,
   Collector>
out) throws Exception {

.

.

.


Any one know a fix for this?


Re: Using Flink with Redis question

2015-09-04 Thread Jay Vyas
Maybe wrapping Jedis with a serializable class will do the trick?

But in general is there a way to reference jar classes  in flink apps without 
serializable them?


> On Sep 4, 2015, at 1:36 PM, Jerry Peng  wrote:
> 
> Hello,
> 
> So I am trying to use jedis (redis java client) with Flink streaming api, but 
> I get an exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>   at org.apache.flink.client.program.Client.run(Client.java:278)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Object 
> flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not 
> serializable
>   at 
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>   at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624)
>   at 
> flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:483)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>   ... 6 more
> Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306)
>   at 
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>   ... 16 more
> 
> 
> 
> 
> so my code I am using: 
> 
> public static class RedisJoinBolt implements FlatMapFunction String,String,String,String>
> , Tuple6> {
>  private Jedis jedis;
>  private HashMap ad_to_campaign;
> 
>  public RedisJoinBolt(String jedisServer) {
>   //initialize jedis
>   this.jedis = new Jedis(jedisServer);
>  }
> 
>  @Override
>  public void flatMap(Tuple5 input,
>Collector> out) 
> throws Exception {
> .
> .
> .
> 
> Any one know a fix for this?


Re: Using Flink with Redis question

2015-09-04 Thread Márton Balassi
Hey Jerry,

Jay is on the right track, this issue has to do with the Flink operator
life cycle. When you hit execute all your user defined classes get
serialized, so that they can be shipped to the workers on the cluster. To
execute some code before your FlatMapFunction starts processing the data
you can use the open() function of the RichFlatMapFunction, thus enabling
you to make the Jedis attribute transient:

public static class RedisJoinBolt implements
RichFlatMapFunction
, Tuple6> {
 private transient Jedis jedis;

private Jedis jedisServer;

private HashMap ad_to_campaign;

public RedisJoinBolt(String jedisServer) {
//initialize jedis
this.jedisServer = jedisServer;
}

@Override
public void open(Configuration parameters) {
  //initialize jedis
  this.jedis = new Jedis(jedisServer);
 }


@Override
public void flatMap(Tuple5 input,
Collector> out)
throws Exception
{


On Fri, Sep 4, 2015 at 8:11 PM, Jay Vyas 
wrote:

> Maybe wrapping Jedis with a serializable class will do the trick?
>
> But in general is there a way to reference jar classes  in flink apps
> without serializable them?
>
>
> On Sep 4, 2015, at 1:36 PM, Jerry Peng 
> wrote:
>
> Hello,
>
> So I am trying to use jedis (redis java client) with Flink streaming api,
> but I get an exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>
> at org.apache.flink.client.program.Client.run(Client.java:278)
>
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)
>
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954)
>
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004)
>
> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
> flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not
> serializable
>
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320)
>
> at
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144)
>
> at
> org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624)
>
> at
> flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:483)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>
> ... 6 more
>
> Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis
>
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306)
>
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>
> ... 16 more
>
>
>
>
>
> so my code I am using:
>
>
> public static class RedisJoinBolt implements FlatMapFunction String,String,String,String>
> , Tuple6> {
>  private Jedis jedis;
>  private HashMap ad_to_campaign;
>
>  public RedisJoinBolt(String jedisServer) {
>   //initialize jedis
>   this.jedis = new Jedis(jedisServer);
>  }
>
>  @Override
>  public void flatMap(Tuple5 input,
>Collector> out) 
> throws Exception {
>
> .
>
> .
>
> .
>
>
> Any one know a fix for this?
>
>