Re: Flink ML 1.0.0 - Saving and Loading Models to Score a Single Feature Vector

2016-04-12 Thread KirstiLaurila
How should this be done for the recommendation engine (that is ALS, example
here 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/ml/als.html

 
).

 I am able to run the example with my example data but cannot get anything
written to any file (user or item matrices). 

Basically, I have tried something like this




Tried also to apply similar approach than this 



but with no success. Could someone help me with this to get my model saved?


Best,
Kirsti



Trevor Grant wrote
> I'm just about to open an issue / PR solution for 'warm-starts'
> 
> Once this is in, we could just add a setter for the weight vector (and
> what
> ever iteration you're on if you're going to do more partial fits).
> 
> Then all you need to save if your weight vector (and iter number).
> 
> 
> 
> Trevor Grant
> Data Scientist
> https://github.com/rawkintrevo
> http://stackexchange.com/users/3002022/rawkintrevo
> http://trevorgrant.org
> 
> *"Fortunate is he, who is able to know the causes of things."  -Virgil*
> 
> 
> On Fri, Apr 8, 2016 at 9:04 AM, Behrouz Derakhshan <

> behrouz.derakhshan@

>> wrote:
> 
>> Is there a reasons the Predictor or Estimator class don't have read and
>> write methods for saving and retrieving the model? I couldn't find Jira
>> issues for it. Does it make sense to create one ?
>>
>> BR,
>> Behrouz
>>
>> On Wed, Mar 30, 2016 at 4:40 PM, Till Rohrmann <

> trohrmann@

> >
>> wrote:
>>
>>> Yes Suneel is completely wright. If the data does not implement
>>> IOReadableWritable it is probably easier to use the
>>> TypeSerializerOutputFormat. What you need here to seralize the data is a
>>> TypeSerializer. You can obtain it the following way:
>>>
>>> val model = mlr.weightsOption.get
>>>
>>> val weightVectorTypeInfo = TypeInformation.of(classOf[WeightVector])
>>> val weightVectorSerializer = weightVectorTypeInfo.createSerializer(new
>>> ExecutionConfig())
>>> val outputFormat = new TypeSerializerOutputFormat[WeightVector]
>>> outputFormat.setSerializer(weightVectorSerializer)
>>>
>>> model.write(outputFormat, "path")
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Tue, Mar 29, 2016 at 8:22 PM, Suneel Marthi <

> smarthi@

> >
>>> wrote:
>>>
 U may want to use FlinkMLTools.persist() methods which use
 TypeSerializerFormat and don't enforce IOReadableWritable.



 On Tue, Mar 29, 2016 at 2:12 PM, Sourigna Phetsarath <
 

> gna.phetsarath@

>> wrote:

> Till,
>
> Thank you for your reply.
>
> Having this issue though, WeightVector does not extend
> IOReadWriteable:
>
> *public* *class* SerializedOutputFormat<*T* *extends*
> IOReadableWritable>
>
> *case* *class* WeightVector(weights: Vector, intercept: Double)
> *extends* Serializable {}
>
>
> However, I will use the approach to write out the weights as text.
>
>
> On Tue, Mar 29, 2016 at 5:01 AM, Till Rohrmann <

> trohrmann@

> >
> wrote:
>
>> Hi Gna,
>>
>> there are no utilities yet to do that but you can do it manually. In
>> the end, a model is simply a Flink DataSet which you can serialize to
>> some file. Upon reading this DataSet you simply have to give it to
>> your algorithm to be used as the model. The following code snippet
>> illustrates this approach:
>>
>> mlr.fit(inputDS, parameters)
>>
>> // write model to disk using the SerializedOutputFormat
>> mlr.weightsOption.get.write(new SerializedOutputFormat[WeightVector],
>> "path")
>>
>> // read the serialized model from disk
>> val model = env.readFile(new SerializedInputFormat[WeightVector],
>> "path")
>>
>> // set the read model for the MLR algorithm
>> mlr.weightsOption = model
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Mar 29, 2016 at 10:46 AM, Simone Robutti <
>> 

> simone.robutti@

>> wrote:
>>
>>> To my knowledge there is nothing like that. PMML is not supported in
>>> any form and there's no custom saving format yet. If you really need
>>> a
>>> quick and dirty solution, it's not that hard to serialize the model
>>> into a
>>> file.
>>>
>>> 2016-03-28 17:59 GMT+02:00 Sourigna Phetsarath <
>>> 

> gna.phetsarath@

>>:
>>>
 Flinksters,

 Is there an example of saving a Trained Model, loading a Trained
 Model and then scoring one or more feature vectors using Flink ML?

 All of the examples I've seen have shown only sequential fit and
 predict.

 Thank you.

 -Gna
 --


 *Gna Phetsarath*System Architect // AOL Platforms // Data Services
 // Applied Research Chapter
 770 Broadway, 5th Floor, New York, NY 10003
 o: 212.402.4871 // m: 917.373.7363
 vvmr: 8890237 aim: sphetsarat

Re: Flink ML 1.0.0 - Saving and Loading Models to Score a Single Feature Vector

2016-04-12 Thread Till Rohrmann
Hi Kirsti,

I think you attached some images to your file which show the code.
Unfortunately this is not supported by the mailing list. So maybe you could
resend what you’ve already tried.

In order to access the ALS model, you can do the following:

val als = ALS()

als.fit(input)

val (userFactorsOpt, itemFactorsOpt) = als.factorsOption

val factorsTypeInfo = TypeInformation.of(classOf[Factors])
val factorsSerializer = factorsTypeInfo.createSerializer(new ExecutionConfig())
val outputFormat = new TypeSerializerOutputFormat[Factors]

userFactorsOpt match {
case Some(userFactors) => userFactors.write(outputFormat, "user_path")
case None =>
}

itemFactorsOpt match {
case Some(itemFactors) => itemFactors.write(outputFormat, "item_path")
case None =>
}

Cheers,
Till
​

On Tue, Apr 12, 2016 at 10:29 AM, KirstiLaurila 
wrote:

> How should this be done for the recommendation engine (that is ALS, example
> here
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/ml/als.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/ml/als.html
> >
> ).
>
>  I am able to run the example with my example data but cannot get anything
> written to any file (user or item matrices).
>
> Basically, I have tried something like this
>
>
>
>
> Tried also to apply similar approach than this
>
>
>
> but with no success. Could someone help me with this to get my model saved?
>
>
> Best,
> Kirsti
>
>
>
> Trevor Grant wrote
> > I'm just about to open an issue / PR solution for 'warm-starts'
> >
> > Once this is in, we could just add a setter for the weight vector (and
> > what
> > ever iteration you're on if you're going to do more partial fits).
> >
> > Then all you need to save if your weight vector (and iter number).
> >
> >
> >
> > Trevor Grant
> > Data Scientist
> > https://github.com/rawkintrevo
> > http://stackexchange.com/users/3002022/rawkintrevo
> > http://trevorgrant.org
> >
> > *"Fortunate is he, who is able to know the causes of things."  -Virgil*
> >
> >
> > On Fri, Apr 8, 2016 at 9:04 AM, Behrouz Derakhshan <
>
> > behrouz.derakhshan@
>
> >> wrote:
> >
> >> Is there a reasons the Predictor or Estimator class don't have read and
> >> write methods for saving and retrieving the model? I couldn't find Jira
> >> issues for it. Does it make sense to create one ?
> >>
> >> BR,
> >> Behrouz
> >>
> >> On Wed, Mar 30, 2016 at 4:40 PM, Till Rohrmann <
>
> > trohrmann@
>
> > >
> >> wrote:
> >>
> >>> Yes Suneel is completely wright. If the data does not implement
> >>> IOReadableWritable it is probably easier to use the
> >>> TypeSerializerOutputFormat. What you need here to seralize the data is
> a
> >>> TypeSerializer. You can obtain it the following way:
> >>>
> >>> val model = mlr.weightsOption.get
> >>>
> >>> val weightVectorTypeInfo = TypeInformation.of(classOf[WeightVector])
> >>> val weightVectorSerializer = weightVectorTypeInfo.createSerializer(new
> >>> ExecutionConfig())
> >>> val outputFormat = new TypeSerializerOutputFormat[WeightVector]
> >>> outputFormat.setSerializer(weightVectorSerializer)
> >>>
> >>> model.write(outputFormat, "path")
> >>>
> >>> Cheers,
> >>> Till
> >>> ​
> >>>
> >>> On Tue, Mar 29, 2016 at 8:22 PM, Suneel Marthi <
>
> > smarthi@
>
> > >
> >>> wrote:
> >>>
>  U may want to use FlinkMLTools.persist() methods which use
>  TypeSerializerFormat and don't enforce IOReadableWritable.
> 
> 
> 
>  On Tue, Mar 29, 2016 at 2:12 PM, Sourigna Phetsarath <
> 
>
> > gna.phetsarath@
>
> >> wrote:
> 
> > Till,
> >
> > Thank you for your reply.
> >
> > Having this issue though, WeightVector does not extend
> > IOReadWriteable:
> >
> > *public* *class* SerializedOutputFormat<*T* *extends*
> > IOReadableWritable>
> >
> > *case* *class* WeightVector(weights: Vector, intercept: Double)
> > *extends* Serializable {}
> >
> >
> > However, I will use the approach to write out the weights as text.
> >
> >
> > On Tue, Mar 29, 2016 at 5:01 AM, Till Rohrmann <
>
> > trohrmann@
>
> > >
> > wrote:
> >
> >> Hi Gna,
> >>
> >> there are no utilities yet to do that but you can do it manually. In
> >> the end, a model is simply a Flink DataSet which you can serialize
> to
> >> some file. Upon reading this DataSet you simply have to give it to
> >> your algorithm to be used as the model. The following code snippet
> >> illustrates this approach:
> >>
> >> mlr.fit(inputDS, parameters)
> >>
> >> // write model to disk using the SerializedOutputFormat
> >> mlr.weightsOption.get.write(new
> SerializedOutputFormat[WeightVector],
> >> "path")
> >>
> >> // read the serialized model from disk
> >> val model = env.readFile(new SerializedInputFormat[WeightVector],
> >> "path")
> >>
> >> // set the read model for the MLR algorithm
> >> mlr.weightsOption = model
> >>
> >> Chee

Re: YARN session application attempts

2016-04-12 Thread Stefano Baghino
Hi Ufuk, sorry for taking an awful lot of time to reply but I fell behind
with the ML in the last couple of weeks due to lack of time.
First of all, thanks for taking the time to help me.

Yes, what I was saying was that apparently from the code (and effectively
as we later found out after a couple of tests) the "upper bound" cited by
the documentation seems invalid (meaning, the number of attempts if
effectively regulated by the flink-conf.yaml and falls back to the
yarn-site.xml only if missing).

We're currently using Hadoop 2.7.1 so we'll try your solution, thanks.

I was also wondering if there's a way to ask to retry indefinitely, so that
a long-running streaming job can endure as many job manager failures as
possible without ever needing for a human intervention to restart the YARN
session.

On Sat, Apr 2, 2016 at 3:53 PM, Ufuk Celebi  wrote:

> Hey Stefano,
>
> yarn.resourcemanager.am.max-attempts is a setting for your YARN
> cluster and cannot be influenced by Flink. Flink cannot set a higher
> number than this for yarn.application-attempts.
>
> The key that is set/overriden by Flink is probably only valid for the
> YARN session, but I'm not too familiar with the code. Maybe someone
> else can chime in.
>
> I would recommend using a newer Hadoop version (>= 2.6), where you can
> configure the failure validity interval, which counts the attempts per
> time interval, e.g. it is allowed to fail 2 times within X seconds.
> Per default, the failure validity interval is configured to the Akka
> timeout (which is per default 10s). I actually think it would make
> sense to increase this a little and leave the attempts at 1 or 2 (in
> the interval).
>
> Does this help?
>
> – Ufuk
>
>
> On Fri, Apr 1, 2016 at 3:24 PM, Stefano Baghino
>  wrote:
> > Hello everybody,
> >
> > I was asking myself: are there any best practices regarding how to set
> the
> > `yarn.application-attempts` configuration key when running Flink on YARN
> as
> > a long-running session? The configuration page on the docs states that 1
> is
> > the default and that it is recommended to leave it like that, however in
> the
> > case of a long running session it seems to me that the value should be
> > higher in order to actually allow the session to keep running despite Job
> > Managers failing.
> >
> > Furthermore, the HA page on the docs states the following
> >
> > """
> > It’s important to note that yarn.resourcemanager.am.max-attempts is an
> upper
> > bound for the application restarts. Therfore, the number of application
> > attempts set within Flink cannot exceed the YARN cluster setting with
> which
> > YARN was started.
> > """
> >
> > However, after some tests conducted by my colleagues and after looking at
> > the code (FlinkYarnClientBase:522-536) it seems to me that the
> > flink-conf.yaml key, if set, overrides the yarn-site.xml, which in turn
> > overrides the fallback value of 1. Is this right? Is the documentation
> > wrong?
> >
> > --
> > BR,
> > Stefano Baghino
> >
> > Software Engineer @ Radicalbit
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Flink ML 1.0.0 - Saving and Loading Models to Score a Single Feature Vector

2016-04-12 Thread KirstiLaurila
Hi, 

those parts were examples how I had tried. I tried with your suggestions,
but still no success. Additionally, 
there were some problems: 


val (userFactorsOpt, itemFactorsOpt) = als.factorsOption 

If I had just this, userFactorsOpt And itemFactorsOpt did not have write
method. So I added get there i.e.

val (userFactorsOpt, itemFactorsOpt) = als.factorsOption.get 


val factorsTypeInfo = TypeInformation.of(classOf[Factors])
val factorsSerializer = factorsTypeInfo.createSerializer(new
ExecutionConfig())
val outputFormat = new TypeSerializerOutputFormat[Factors]


Here, the factorsSerializer was not used at all, so I guess this was missing
line 

outputFormat.setSerializer(factorsSerializer)


userFactorsOpt match {
case Some(userFactors) => userFactors.write(outputFormat, "user_path")
case None =>
}


This doesn't run because of error message 

Error:(71, 12) constructor cannot be instantiated to expected type;
 found   : Some[A]
 required:
org.apache.flink.api.scala.DataSet[org.apache.flink.ml.recommendation.ALS.Factors]
  case Some(userFactors) => userFactorsOpt.write(outputFormat,
"path_to_my_file")

However, I still tried not to have match case i.e.

userFactorsOpt.write(outputFormat, "path")

but nothing was written anywhere.





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-ML-1-0-0-Saving-and-Loading-Models-to-Score-a-Single-Feature-Vector-tp5766p6059.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Powered by Flink

2016-04-12 Thread Stefano Baghino
Hi Robert, thanks for bringing up the page.
We're in the process of releasing our first version and would like to be
added to the "Powered by" page: "Radicalbit  is an
OSS distribution that uses Flink for Fast Data processing"

On Wed, Apr 6, 2016 at 9:07 AM, Suneel Marthi  wrote:

> I was gonna hold off on that until we get Mahout 0.12.0 out of the door
> (targeted for this weekend).
>
> I would add Apache NiFi to the list.
>
> Future :
>
> Apache Mahout
> Apache BigTop
>
> Openstack and Kubernetes (skunkworks)
>
>
> On Wed, Apr 6, 2016 at 3:03 AM, Sebastian  wrote:
>
>> You should also add Apache Mahout, whose new Samsara DSL also runs on
>> Flink.
>>
>> -s
>>
>> On 06.04.2016 08:50, Henry Saputra wrote:
>>
>>> Thanks, Slim. I have just updated the wiki page with this entries.
>>>
>>> On Tue, Apr 5, 2016 at 10:20 AM, Slim Baltagi >> > wrote:
>>>
>>> Hi
>>>
>>> The following are missing in the ‘Powered by Flink’ list:
>>>
>>>   * *king.com 
>>> *
>>> https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces88
>>>   * *Otto Group
>>> *
>>> http://data-artisans.com/how-we-selected-apache-flink-at-otto-group/
>>>   * *Eura Nova *
>>> https://research.euranova.eu/flink-forward-2015-talk/
>>>   * *Big Data Europe *http://www.big-data-europe.eu
>>>
>>> Thanks
>>>
>>> Slim Baltagi
>>>
>>>
>>> On Apr 5, 2016, at 10:08 AM, Robert Metzger >>> > wrote:

 Hi everyone,

 I would like to bring the "Powered by Flink" wiki page [1] to the
 attention of Flink user's who recently joined the Flink community.
 The list tracks which organizations are using Flink.
 If your company / university / research institute / ... is using
 Flink but the name is not yet listed there, let me know and I'll
 add the name.

 Regards,
 Robert

 [1]
 https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink


 On Mon, Oct 19, 2015 at 4:10 PM, Matthias J. Sax >>> > wrote:

 +1

 On 10/19/2015 04:05 PM, Maximilian Michels wrote:
 > +1 Let's collect in the Wiki for now. At some point in time,
 we might
 > want to have a dedicated page on the Flink homepage.
 >
 > On Mon, Oct 19, 2015 at 3:31 PM, Timo Walther
 mailto:twal...@apache.org>> wrote:
 >> Ah ok, sorry. I think linking to the wiki is also ok.
 >>
 >>
 >> On 19.10.2015 15:18, Fabian Hueske wrote:
 >>>
 >>> @Timo: The proposal was to keep the list in the wiki (can
 be easily
 >>> extended) but link from the main website to the wiki page.
 >>>
 >>> 2015-10-19 15:16 GMT+02:00 Timo Walther
 mailto:twal...@apache.org>>:
 >>>
  +1 for adding it to the website instead of wiki.
  "Who is using Flink?" is always a question difficult to
 answer to
  interested users.
 
 
  On 19.10.2015 15:08, Suneel Marthi wrote:
 
  +1 to this.
 
  On Mon, Oct 19, 2015 at 3:00 PM, Fabian Hueske
 mailto:fhue...@gmail.com>> wrote:
 
 > Sounds good +1
 >
 > 2015-10-19 14:57 GMT+02:00 Márton Balassi <
 mailto:balassi.mar...@gmail.com>>
 > balassi.mar...@gmail.com >:
 >
 >> Thanks for starting and big +1 for making it more
 prominent.
 >>
 >> On Mon, Oct 19, 2015 at 2:53 PM, Fabian Hueske <
 mailto:fhue...@gmail.com>>
 >
 > fhue...@gmail.com > wrote:
 >>>
 >>> Thanks for starting this Kostas.
 >>>
 >>> I think the list is quite hidden in the wiki. Should
 we link from
 >>> flink.apache.org  to that
 page?
 >>>
 >>> Cheers, Fabian
 >>>
 >>> 2015-10-19 14:50 GMT+02:00 Kostas Tzoumas <
 mailto:ktzou...@apache.org>>
 >
 > ktzou...@apache.org >:
 
  Hi everyone,
 
  I started a "Powered by Flink" wiki page, listing
 some of the
  organizations that are using Flink:
 
 

 https://cwiki.apache.org/

Re: Powered by Flink

2016-04-12 Thread Fabian Hueske
Done, thanks Stefano!

2016-04-12 15:38 GMT+02:00 Stefano Baghino :

> Hi Robert, thanks for bringing up the page.
> We're in the process of releasing our first version and would like to be
> added to the "Powered by" page: "Radicalbit  is an
> OSS distribution that uses Flink for Fast Data processing"
>
> On Wed, Apr 6, 2016 at 9:07 AM, Suneel Marthi  wrote:
>
>> I was gonna hold off on that until we get Mahout 0.12.0 out of the door
>> (targeted for this weekend).
>>
>> I would add Apache NiFi to the list.
>>
>> Future :
>>
>> Apache Mahout
>> Apache BigTop
>>
>> Openstack and Kubernetes (skunkworks)
>>
>>
>> On Wed, Apr 6, 2016 at 3:03 AM, Sebastian  wrote:
>>
>>> You should also add Apache Mahout, whose new Samsara DSL also runs on
>>> Flink.
>>>
>>> -s
>>>
>>> On 06.04.2016 08:50, Henry Saputra wrote:
>>>
 Thanks, Slim. I have just updated the wiki page with this entries.

 On Tue, Apr 5, 2016 at 10:20 AM, Slim Baltagi >>> > wrote:

 Hi

 The following are missing in the ‘Powered by Flink’ list:

   * *king.com 
 *
 https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces88
   * *Otto Group
 *
 http://data-artisans.com/how-we-selected-apache-flink-at-otto-group/
   * *Eura Nova *
 https://research.euranova.eu/flink-forward-2015-talk/
   * *Big Data Europe *http://www.big-data-europe.eu

 Thanks

 Slim Baltagi


 On Apr 5, 2016, at 10:08 AM, Robert Metzger  > wrote:
>
> Hi everyone,
>
> I would like to bring the "Powered by Flink" wiki page [1] to the
> attention of Flink user's who recently joined the Flink community.
> The list tracks which organizations are using Flink.
> If your company / university / research institute / ... is using
> Flink but the name is not yet listed there, let me know and I'll
> add the name.
>
> Regards,
> Robert
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink
>
>
> On Mon, Oct 19, 2015 at 4:10 PM, Matthias J. Sax  > wrote:
>
> +1
>
> On 10/19/2015 04:05 PM, Maximilian Michels wrote:
> > +1 Let's collect in the Wiki for now. At some point in time,
> we might
> > want to have a dedicated page on the Flink homepage.
> >
> > On Mon, Oct 19, 2015 at 3:31 PM, Timo Walther
> mailto:twal...@apache.org>> wrote:
> >> Ah ok, sorry. I think linking to the wiki is also ok.
> >>
> >>
> >> On 19.10.2015 15:18, Fabian Hueske wrote:
> >>>
> >>> @Timo: The proposal was to keep the list in the wiki (can
> be easily
> >>> extended) but link from the main website to the wiki page.
> >>>
> >>> 2015-10-19 15:16 GMT+02:00 Timo Walther
> mailto:twal...@apache.org>>:
> >>>
>  +1 for adding it to the website instead of wiki.
>  "Who is using Flink?" is always a question difficult to
> answer to
>  interested users.
> 
> 
>  On 19.10.2015 15:08, Suneel Marthi wrote:
> 
>  +1 to this.
> 
>  On Mon, Oct 19, 2015 at 3:00 PM, Fabian Hueske
> mailto:fhue...@gmail.com>> wrote:
> 
> > Sounds good +1
> >
> > 2015-10-19 14:57 GMT+02:00 Márton Balassi <
> mailto:balassi.mar...@gmail.com>>
> > balassi.mar...@gmail.com  balassi.mar...@gmail.com>>:
> >
> >> Thanks for starting and big +1 for making it more
> prominent.
> >>
> >> On Mon, Oct 19, 2015 at 2:53 PM, Fabian Hueske <
> mailto:fhue...@gmail.com>>
> >
> > fhue...@gmail.com > wrote:
> >>>
> >>> Thanks for starting this Kostas.
> >>>
> >>> I think the list is quite hidden in the wiki. Should
> we link from
> >>> flink.apache.org  to that
> page?
> >>>
> >>> Cheers, Fabian
> >>>
> >>> 2015-10-19 14:50 GMT+02:00 Kostas Tzoumas <
> mailto:ktzou...@apache.org>>
> >
> > ktzou...@apache.org >:
> 
>  Hi everyone,
> 
>  I start

Re: Powered by Flink

2016-04-12 Thread Stefano Baghino
Thanks to you! :)

On Tue, Apr 12, 2016 at 4:16 PM, Fabian Hueske  wrote:

> Done, thanks Stefano!
>
> 2016-04-12 15:38 GMT+02:00 Stefano Baghino 
> :
>
>> Hi Robert, thanks for bringing up the page.
>> We're in the process of releasing our first version and would like to be
>> added to the "Powered by" page: "Radicalbit  is an
>> OSS distribution that uses Flink for Fast Data processing"
>>
>> On Wed, Apr 6, 2016 at 9:07 AM, Suneel Marthi  wrote:
>>
>>> I was gonna hold off on that until we get Mahout 0.12.0 out of the door
>>> (targeted for this weekend).
>>>
>>> I would add Apache NiFi to the list.
>>>
>>> Future :
>>>
>>> Apache Mahout
>>> Apache BigTop
>>>
>>> Openstack and Kubernetes (skunkworks)
>>>
>>>
>>> On Wed, Apr 6, 2016 at 3:03 AM, Sebastian  wrote:
>>>
 You should also add Apache Mahout, whose new Samsara DSL also runs on
 Flink.

 -s

 On 06.04.2016 08:50, Henry Saputra wrote:

> Thanks, Slim. I have just updated the wiki page with this entries.
>
> On Tue, Apr 5, 2016 at 10:20 AM, Slim Baltagi  > wrote:
>
> Hi
>
> The following are missing in the ‘Powered by Flink’ list:
>
>   * *king.com 
> *
> https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces88
>   * *Otto Group
> *
> http://data-artisans.com/how-we-selected-apache-flink-at-otto-group/
>   * *Eura Nova *
> https://research.euranova.eu/flink-forward-2015-talk/
>   * *Big Data Europe *http://www.big-data-europe.eu
>
> Thanks
>
> Slim Baltagi
>
>
> On Apr 5, 2016, at 10:08 AM, Robert Metzger > > wrote:
>>
>> Hi everyone,
>>
>> I would like to bring the "Powered by Flink" wiki page [1] to the
>> attention of Flink user's who recently joined the Flink community.
>> The list tracks which organizations are using Flink.
>> If your company / university / research institute / ... is using
>> Flink but the name is not yet listed there, let me know and I'll
>> add the name.
>>
>> Regards,
>> Robert
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink
>>
>>
>> On Mon, Oct 19, 2015 at 4:10 PM, Matthias J. Sax <
>> mj...@apache.org
>> > wrote:
>>
>> +1
>>
>> On 10/19/2015 04:05 PM, Maximilian Michels wrote:
>> > +1 Let's collect in the Wiki for now. At some point in time,
>> we might
>> > want to have a dedicated page on the Flink homepage.
>> >
>> > On Mon, Oct 19, 2015 at 3:31 PM, Timo Walther
>> mailto:twal...@apache.org>> wrote:
>> >> Ah ok, sorry. I think linking to the wiki is also ok.
>> >>
>> >>
>> >> On 19.10.2015 15:18, Fabian Hueske wrote:
>> >>>
>> >>> @Timo: The proposal was to keep the list in the wiki (can
>> be easily
>> >>> extended) but link from the main website to the wiki page.
>> >>>
>> >>> 2015-10-19 15:16 GMT+02:00 Timo Walther
>> mailto:twal...@apache.org>>:
>> >>>
>>  +1 for adding it to the website instead of wiki.
>>  "Who is using Flink?" is always a question difficult to
>> answer to
>>  interested users.
>> 
>> 
>>  On 19.10.2015 15:08, Suneel Marthi wrote:
>> 
>>  +1 to this.
>> 
>>  On Mon, Oct 19, 2015 at 3:00 PM, Fabian Hueske
>> mailto:fhue...@gmail.com>> wrote:
>> 
>> > Sounds good +1
>> >
>> > 2015-10-19 14:57 GMT+02:00 Márton Balassi <
>> mailto:balassi.mar...@gmail.com>>
>> > balassi.mar...@gmail.com > balassi.mar...@gmail.com>>:
>> >
>> >> Thanks for starting and big +1 for making it more
>> prominent.
>> >>
>> >> On Mon, Oct 19, 2015 at 2:53 PM, Fabian Hueske <
>> mailto:fhue...@gmail.com>>
>> >
>> > fhue...@gmail.com > wrote:
>> >>>
>> >>> Thanks for starting this Kostas.
>> >>>
>> >>> I think the list is quite hidden in the wiki. Should
>> we link from
>> >>> flink.apache.org  to that
>> page?
>> >>>
>> >>> Cheers, Fabian
>> >>>
>> >>> 2015-10-19 14:50 GMT+02:00 Kostas Tzoumas <
>

Re: Flink ML 1.0.0 - Saving and Loading Models to Score a Single Feature Vector

2016-04-12 Thread Till Rohrmann
Sorry, I had a mistake in my example code. I thought the model would be
stored as a (Option[DataSet[Factors]], Option[DataSet[Factors]]) but
instead it’s stored as Option[(DataSet[Factors], DataSet[Factors])].

So the code should be

val als = ALS()

als.fit(input)

val alsModelOpt = als.factorsOption

val factorsTypeInfo = TypeInformation.of(classOf[Factors])
val factorsSerializer = factorsTypeInfo.createSerializer(new ExecutionConfig())
val outputFormat = new TypeSerializerOutputFormat[Factors]
outputFormat.setSerializer(factorsSerializer)

alsModelOpt match {
case Some((userFactors, itemFactors)) =>
userFactors.write(outputFormat, "user_path")
itemFactors.write(outputFormat, "item_path")
case None =>
}

if I’m not mistaken.

If you don’t see any output, then it might be the case that your model is
empty. Could you check that? You could for example simply call print on the
model DataSet.

Do you call env.execute at the end of your program? If you don’t do that,
then the job is not executed.

Cheers,
Till
​

On Tue, Apr 12, 2016 at 1:25 PM, KirstiLaurila 
wrote:

> Hi,
>
> those parts were examples how I had tried. I tried with your suggestions,
> but still no success. Additionally,
> there were some problems:
>
>
> val (userFactorsOpt, itemFactorsOpt) = als.factorsOption
>
> If I had just this, userFactorsOpt And itemFactorsOpt did not have write
> method. So I added get there i.e.
>
> val (userFactorsOpt, itemFactorsOpt) = als.factorsOption.get
>
>
> val factorsTypeInfo = TypeInformation.of(classOf[Factors])
> val factorsSerializer = factorsTypeInfo.createSerializer(new
> ExecutionConfig())
> val outputFormat = new TypeSerializerOutputFormat[Factors]
>
>
> Here, the factorsSerializer was not used at all, so I guess this was
> missing
> line
>
> outputFormat.setSerializer(factorsSerializer)
>
>
> userFactorsOpt match {
> case Some(userFactors) => userFactors.write(outputFormat, "user_path")
> case None =>
> }
>
>
> This doesn't run because of error message
>
> Error:(71, 12) constructor cannot be instantiated to expected type;
>  found   : Some[A]
>  required:
>
> org.apache.flink.api.scala.DataSet[org.apache.flink.ml.recommendation.ALS.Factors]
>   case Some(userFactors) => userFactorsOpt.write(outputFormat,
> "path_to_my_file")
>
> However, I still tried not to have match case i.e.
>
> userFactorsOpt.write(outputFormat, "path")
>
> but nothing was written anywhere.
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-ML-1-0-0-Saving-and-Loading-Models-to-Score-a-Single-Feature-Vector-tp5766p6059.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


RocksDB Statebackend

2016-04-12 Thread Konstantin Knauf
Hi everyone,

my experience with RocksDBStatebackend have left me a little bit
confused. Maybe you guys can confirm that my epxierence is the expected
behaviour ;):

I have run a "performancetest" twice, once with FsStateBackend and once
RocksDBStatebackend in comparison. In this particular test the state
saved is generally not large (in a production scenario it will be larger.)

These are my observations:

1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
to <<1MB with the FSStatebackend.

2. Throughput dropped from 28k/s -> 18k/s on a small cluster.

3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
gets smaller for very large state. Can you confirm?

4. Checkpointing Times as reported in the Dashboard were 26secs for
RocksDB during the test and <1 second for FsStatebackend. Does the
reported time correspond to the sync. + asynchronous part of the
checkpointing in case of RocksDB? Is there any way to tell how long the
synchronous part takes?

Form these first observations RocksDB does seem to bring a large
overhead for state < 1GB, I guess? Is this expected?

Cheers,

Konstantin


Re: RocksDB Statebackend

2016-04-12 Thread Aljoscha Krettek
Hi,
I'm going to try and respond to each point:

1. This seems strange, could you give some background on parallelism,
number of operators with state and so on? Also, I'm assuming you are using
the partitioned state abstraction, i.e. getState(), correct?

2. your observations are pretty much correct. The reason why RocksDB is
slower is that the FsStateBackend basically stores the state in a Java
HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
data in on-disk files and goes to them for every state access (of course
there are caches, but generally it is like this). I'm actually impressed
that it is still this fast in comparison.

3. see 1. (I think for now)

4. The checkpointing time is the time from the JobManager deciding to start
a checkpoint until all tasks have confirmed that checkpoint. I have seen
this before and I think it results from back pressure. The problem is that
the checkpoint messages that we sent through the topology are sitting at
the sources because they are also back pressured by the slow processing of
normal records. You should be able to see the actual checkpointing times
(both synchronous and asynchronous) in the log files of the task managers,
they should be very much lower.

I can go into details, I'm just writing this quickly before calling it a
day. :-)

Cheers,
Aljoscha

On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf 
wrote:

> Hi everyone,
>
> my experience with RocksDBStatebackend have left me a little bit
> confused. Maybe you guys can confirm that my epxierence is the expected
> behaviour ;):
>
> I have run a "performancetest" twice, once with FsStateBackend and once
> RocksDBStatebackend in comparison. In this particular test the state
> saved is generally not large (in a production scenario it will be larger.)
>
> These are my observations:
>
> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
> to <<1MB with the FSStatebackend.
>
> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>
> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
> gets smaller for very large state. Can you confirm?
>
> 4. Checkpointing Times as reported in the Dashboard were 26secs for
> RocksDB during the test and <1 second for FsStatebackend. Does the
> reported time correspond to the sync. + asynchronous part of the
> checkpointing in case of RocksDB? Is there any way to tell how long the
> synchronous part takes?
>
> Form these first observations RocksDB does seem to bring a large
> overhead for state < 1GB, I guess? Is this expected?
>
> Cheers,
>
> Konstantin
>


Re: RocksDB Statebackend

2016-04-12 Thread Stephan Ewen
Concerning the size of RocksDB snapshots - I am wondering if RocksDB simply
does not compact for a long time, thus having a lot of stale data in the
snapshot.

That would be especially the case, if you have a lot of changing values for
the same set of keys.

On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
wrote:

> Hi,
> I'm going to try and respond to each point:
>
> 1. This seems strange, could you give some background on parallelism,
> number of operators with state and so on? Also, I'm assuming you are using
> the partitioned state abstraction, i.e. getState(), correct?
>
> 2. your observations are pretty much correct. The reason why RocksDB is
> slower is that the FsStateBackend basically stores the state in a Java
> HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
> data in on-disk files and goes to them for every state access (of course
> there are caches, but generally it is like this). I'm actually impressed
> that it is still this fast in comparison.
>
> 3. see 1. (I think for now)
>
> 4. The checkpointing time is the time from the JobManager deciding to
> start a checkpoint until all tasks have confirmed that checkpoint. I have
> seen this before and I think it results from back pressure. The problem is
> that the checkpoint messages that we sent through the topology are sitting
> at the sources because they are also back pressured by the slow processing
> of normal records. You should be able to see the actual checkpointing times
> (both synchronous and asynchronous) in the log files of the task managers,
> they should be very much lower.
>
> I can go into details, I'm just writing this quickly before calling it a
> day. :-)
>
> Cheers,
> Aljoscha
>
> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <
> konstantin.kn...@tngtech.com> wrote:
>
>> Hi everyone,
>>
>> my experience with RocksDBStatebackend have left me a little bit
>> confused. Maybe you guys can confirm that my epxierence is the expected
>> behaviour ;):
>>
>> I have run a "performancetest" twice, once with FsStateBackend and once
>> RocksDBStatebackend in comparison. In this particular test the state
>> saved is generally not large (in a production scenario it will be larger.)
>>
>> These are my observations:
>>
>> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
>> to <<1MB with the FSStatebackend.
>>
>> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>>
>> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
>> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
>> gets smaller for very large state. Can you confirm?
>>
>> 4. Checkpointing Times as reported in the Dashboard were 26secs for
>> RocksDB during the test and <1 second for FsStatebackend. Does the
>> reported time correspond to the sync. + asynchronous part of the
>> checkpointing in case of RocksDB? Is there any way to tell how long the
>> synchronous part takes?
>>
>> Form these first observations RocksDB does seem to bring a large
>> overhead for state < 1GB, I guess? Is this expected?
>>
>> Cheers,
>>
>> Konstantin
>>
>


Re: RocksDB Statebackend

2016-04-12 Thread Maxim
Is it possible to add an option to store the state in the Java HashMap and
write its content to RocksDB when checkpointing? For "hot" keys that are
updated very frequently such optimization would help with performance.

I know that you are also working on incremental checkpoints which would
also be big win for jobs with a large number of keys.

Thanks,

Maxim.

On Tue, Apr 12, 2016 at 10:39 AM, Stephan Ewen  wrote:

> Concerning the size of RocksDB snapshots - I am wondering if RocksDB
> simply does not compact for a long time, thus having a lot of stale data in
> the snapshot.
>
> That would be especially the case, if you have a lot of changing values
> for the same set of keys.
>
> On Tue, Apr 12, 2016 at 6:41 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> I'm going to try and respond to each point:
>>
>> 1. This seems strange, could you give some background on parallelism,
>> number of operators with state and so on? Also, I'm assuming you are using
>> the partitioned state abstraction, i.e. getState(), correct?
>>
>> 2. your observations are pretty much correct. The reason why RocksDB is
>> slower is that the FsStateBackend basically stores the state in a Java
>> HashMap and writes the contents to HDFS when checkpointing. RocksDB stores
>> data in on-disk files and goes to them for every state access (of course
>> there are caches, but generally it is like this). I'm actually impressed
>> that it is still this fast in comparison.
>>
>> 3. see 1. (I think for now)
>>
>> 4. The checkpointing time is the time from the JobManager deciding to
>> start a checkpoint until all tasks have confirmed that checkpoint. I have
>> seen this before and I think it results from back pressure. The problem is
>> that the checkpoint messages that we sent through the topology are sitting
>> at the sources because they are also back pressured by the slow processing
>> of normal records. You should be able to see the actual checkpointing times
>> (both synchronous and asynchronous) in the log files of the task managers,
>> they should be very much lower.
>>
>> I can go into details, I'm just writing this quickly before calling it a
>> day. :-)
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 12 Apr 2016 at 18:21 Konstantin Knauf <
>> konstantin.kn...@tngtech.com> wrote:
>>
>>> Hi everyone,
>>>
>>> my experience with RocksDBStatebackend have left me a little bit
>>> confused. Maybe you guys can confirm that my epxierence is the expected
>>> behaviour ;):
>>>
>>> I have run a "performancetest" twice, once with FsStateBackend and once
>>> RocksDBStatebackend in comparison. In this particular test the state
>>> saved is generally not large (in a production scenario it will be
>>> larger.)
>>>
>>> These are my observations:
>>>
>>> 1. Minimal Checkpoint Size (no records) with RocksDB was 33MB compared
>>> to <<1MB with the FSStatebackend.
>>>
>>> 2. Throughput dropped from 28k/s -> 18k/s on a small cluster.
>>>
>>> 3. Checkpoint sizes as reported in the Dashboard was ca. 1MB for
>>> FsStatebackend but >100MB for RocksDbStatebackend. I hope the difference
>>> gets smaller for very large state. Can you confirm?
>>>
>>> 4. Checkpointing Times as reported in the Dashboard were 26secs for
>>> RocksDB during the test and <1 second for FsStatebackend. Does the
>>> reported time correspond to the sync. + asynchronous part of the
>>> checkpointing in case of RocksDB? Is there any way to tell how long the
>>> synchronous part takes?
>>>
>>> Form these first observations RocksDB does seem to bring a large
>>> overhead for state < 1GB, I guess? Is this expected?
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>
>


FoldFunction accumulator checkpointing

2016-04-12 Thread Michael Radford
I'm wondering whether the accumulator value maintained by a
FoldFunction is automatically checkpointed?

In general, but specifically when using the WindowedStream.apply
variant that takes a FoldFunction:

public  DataStream apply(R initialValue,
  FoldFunction foldFunction,
  WindowFunction function,
  TypeInformation evidence$7)

If not, then Flink 1.0.1 still has the issue that you can't pass a
RichFoldFunction to WindowedStream.apply
(java.lang.UnsupportedOperationException: ReduceFunction of apply can
not be a RichFunction).

But also, if not, it seems like this would be a common pattern when
doing complex (keyed / multi-valued) aggregations, and if the
accumulator type R is serializable, there could be a convenience
method for a checkpointed fold, like the mapWithState mentioned in the
State section of the streaming guide.

Thanks,
Mike


Re: DataSet.randomSplit()

2016-04-12 Thread Trevor Grant
Hey all,

Sorry I missed this thread.

The related issue is: https://issues.apache.org/jira/browse/FLINK-2259

I checked it out then forgot about it.  I'm cranking on it now.

tg



Trevor Grant
Data Scientist
https://github.com/rawkintrevo
http://stackexchange.com/users/3002022/rawkintrevo
http://trevorgrant.org

*"Fortunate is he, who is able to know the causes of things."  -Virgil*


On Tue, Mar 29, 2016 at 4:33 AM, Till Rohrmann  wrote:

> Hi,
>
> I think Ufuk is completely right. As far as I know, we don't support this
> function and nobody's currently working on it. If you like, then you could
> take the lead there.
>
> Cheers,
> Till
>
> On Mon, Mar 28, 2016 at 10:50 PM, Ufuk Celebi  wrote:
>
>> Hey Gna! I think that it's not on the road map at the moment. Feel free
>> to ping in the linked PR though. Probably Till can chime in there.
>>
>> – Ufuk
>>
>> On Mon, Mar 28, 2016 at 5:16 PM, Sourigna Phetsarath <
>> gna.phetsar...@teamaol.com> wrote:
>>
>>> Ufuk,
>>>
>>> Thank you.  Yes, I saw the sampling methods in DataSetUtils and they are
>>> helpful.
>>>
>>> Just wanted to see if that particular method is on the road map for a
>>> future release.
>>>
>>> -Gna
>>>
>>> On Mon, Mar 28, 2016 at 6:22 AM, Ufuk Celebi  wrote:
>>>
 Hey Sourigna,

 that particular method is not part of Flink yet.

 Did you have a look at the sampling methods in DataSetUtils? Maybe they
 can be helpful for what you are trying to achieve.

 – Ufuk

 On Wed, Mar 23, 2016 at 5:19 PM, Sourigna Phetsarath <
 gna.phetsar...@teamaol.com> wrote:

> All:
>
> Does Flink DataSet have a randomSplit(weights:Array[Double], seed:
> Long): Array[DataSet[T]] function?
>
> There is this pull request: https://github.com/apache/flink/pull/921
>
> Does anyone have an update of the progress of this?
>
> Thank you.
>
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * *
>


>>>
>>>
>>> --
>>>
>>>
>>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>>> Applied Research Chapter
>>> 770 Broadway, 5th Floor, New York, NY 10003
>>> o: 212.402.4871 // m: 917.373.7363
>>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>>
>>> * *
>>>
>>
>>
>


Monitoring and alerting mechanisms for Flink on YARN

2016-04-12 Thread Soumya Simanta
We are about to deploy a Flink job on YARN in production. Given that it is
a long running process we want to have alerting and monitoring mechanisms
in place.

 Any existing solutions or suggestions to implement a new one would we
appreciated.

Thanks!


Big Data Interview Preparator

2016-04-12 Thread Chaturvedi Chola
Hello

The below is a very good book on Big Data for interview preparation.

A good book on big data interview FAQ
http://www.amazon.in/Big-Data-Interview-FAQs-Chinnasamy/dp/9386009188/ref=sr_1_1?ie=UTF8&qid=1459943243&sr=8-1&keywords=9789386009180


http://www.flipkart.com/big-data-interview-faqs-english/p/itmehg3s662h7zay?pid=9789386009180&affid=editornoti


https://notionpress.com/read/big-data-interview-faqs


Thanks,
Chaturvedi.


"Read size does not match expected size" error when using HyperLogLog

2016-04-12 Thread Hironori Ogibayashi
Hello,

I am trying to use HyperLogLog in
stream-lib(https://github.com/addthis/stream-lib)
in my Flink streaming job, but when I submit the job, I got the
following error. My Flink version is 1.0.1.

---
org.apache.flink.client.program.ProgramInvocationException: The
program execution failed: Job execution failed.
(...)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
(...)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:209)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:186)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Read size does not match expected size.
at 
org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:291)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.readObject(WindowOperator.java:182)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:194)
... 3 more


It looks like the serialize/deserialize problem. Length written by
StateDescriptor.writeObject() and the actual length in readObject()
differs? I have no idea why this happens.

The code is this:

---
val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.fromElements((1,"a"),(1,"b"),(1,"c"),(1,"d"))
stream
  .keyBy(0)
  .countWindow(3)
  .fold(new HyperLogLog(20)){(r,i) => r.offer(i._2); r}
  .map{x => x.cardinality()}

stream.print

env.execute("HLLTest")
---

Any help would be appreciated.

Regards,
Hionori