Re: Counting latest state of stateful entities in streaming

2016-09-30 Thread Simone Robutti
I'm working with your suggestions, thank you very much. What I'm missing
here is what YourWindowFunction should do. I have no notion of event time
there and so I can't assign a timestamp. Also this solution seems to be
working by processing time, while I care about event time. I couldn't make
it run yet but for what I got, this is slightly different from what I need.

2016-09-30 10:04 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Simone,
>
> I think I have a solution for your problem:
>
> val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time)
>
> val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate)
>   .keyBy(_._1) // key by id
>   .flatMap(new StateUpdater) // StateUpdater is a stateful
> FlatMapFunction. It has a keyed state that stores the last state of each
> id. For each input record it returns two records: (oldState, -1),
> (newState, +1)
>
> stateChanges ensures that counts of previous states are subtracted.
>
> val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges //
> (state, cntUpdate, time)
>   .keyBy(_._1) // key by state
>   .window() // your window, should be non-overlapping, so go for instance
> for Tumbling
>   .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums
> the cntUpdates and YourWindowFunction assigns the timestamp of your window
>
> this step aggregates all state changes for each state in a window
>
> val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count,
> time)
>   .keyBy(_._1) // key by state again
>   .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has
> a keyed state that stores the current count. For each incoming record, the
> count is adjusted and a record (state, newCount, time) is emitted.
>
> Now you have the new counts for your states in multiple records. If
> possible, you can update your Elasticsearch index using these. Otherwise,
> you have to collect them into one record using another window.
>
> Also note, that the state size of this program depends on the number of
> unique ids. That might cause problems if the id space grows very fast.
>
> Please let me know, if you have questions or if that works ;-)
>
> Cheers, Fabian
>
>
> 2016-09-30 0:32 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>:
>
>> Hello,
>>
>> in the last few days I tried to create my first real-time analytics job
>> in Flink. The approach is kappa-architecture-like, so I have my raw data on
>> Kafka where we receive a message for every change of state of any entity.
>>
>> So the messages are of the form
>>
>> (id,newStatus, timestamp)
>>
>> We want to compute, for every time window, the count of items in a given
>> status. So the output should be of the form
>>
>> (outputTimestamp, state1:count1,state2:count2 ...)
>>
>> or equivalent. These rows should contain, at any given time, the count of
>> the items in a given status, where the status associated to an Id is the
>> most recent message observed for that id. The status for an id should be
>> counted in any case, even if the event is way older than those getting
>> processed. So the sum of all the counts should be equal to the number of
>> different IDs observed in the system. The following step could be
>> forgetting about the items in a final item after a while, but this is not a
>> strict requirement right now.
>>
>> This will be written on elasticsearch and then queried.
>>
>> I tried many different paths and none of them completely satisfied the
>> requirement. Using a sliding window I could easily achieve the expected
>> behaviour, except that when the beginning of the sliding window surpassed
>> the timestamp of an event, it was lost for the count, as you may expect.
>> Others approaches failed to be consistent when working with a backlog
>> because I did some tricks with keys and timestamps that failed when the
>> data was processed all at once.
>>
>> So I would like to know, even at an high level, how should I approach
>> this problem. It looks like a relatively common use-case but the fact that
>> the relevant information for a given ID must be retained indefinitely to
>> count the entities correctly creates a lot of problems.
>>
>> Thank you in advance,
>>
>> Simone
>>
>>
>


Re: Merge the states of different partition in streaming

2016-09-28 Thread Simone Robutti
Solved. Probably there was an error in the way I was testing. Also I
simplified the job and it works now.

2016-09-27 16:01 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>:

> Hello,
>
> I'm dealing with an analytical job in streaming and I don't know how to
> write the last part.
>
> Actually I want to count all the elements in a window with a given status,
> so I keep a state with a Map[Status,Long]. This state is updated starting
> from tuples containing the oldStatus and the newStatus. So every event
> generates a +1 for the new status and a -1 for the old status. Then I want
> to reduce all these counts and move from a local and partial state to a
> global state that will be written in output.
>
> Right now my code look like:
>
> filteredLatestOrders.keyBy(x => x._1.getStatus).mapWithState(
> updateState).keyBy(x=>"0").reduce((orderA,orderB)=>orderA.sum(orderB))
>
> where "filteredLatestOrder" is a DataStream containing informations about
> the elements, the new state and the old state.
>
> This produces in output:
>
> 2> Map(DRAFT -> 0, RESERVED -> 1, PICKED -> 0, PACKED -> 0)
> 2> Map(DRAFT -> 0, RESERVED -> 2, PICKED -> 0, PACKED -> 0)
> 2> Map(DRAFT -> 0, RESERVED -> 3, PICKED -> 0, PACKED -> 1)
> 3> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 1, PACKED -> 0)
> 4> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 0, PACKED -> 1)
>
> I thought that keying with a fixed value would collect all the elements in
> a single node so that I could finally compute the final result, but they
> are left on different nodes and are never summed.
>
> Is this the correct approach? In that case, how can I do what I need? Is
> there a smarter way to count distinct evolving elements by their status in
> a streaming? Mind that the original source of events are updates to the
> status of an element and the requirement is that I want to count only the
> latest status available.
>
> Thank you in advance,
>
> Simone
>


Merge the states of different partition in streaming

2016-09-27 Thread Simone Robutti
Hello,

I'm dealing with an analytical job in streaming and I don't know how to
write the last part.

Actually I want to count all the elements in a window with a given status,
so I keep a state with a Map[Status,Long]. This state is updated starting
from tuples containing the oldStatus and the newStatus. So every event
generates a +1 for the new status and a -1 for the old status. Then I want
to reduce all these counts and move from a local and partial state to a
global state that will be written in output.

Right now my code look like:

filteredLatestOrders.keyBy(x =>
x._1.getStatus).mapWithState(updateState).keyBy(x=>"0").reduce((orderA,orderB)=>orderA.sum(orderB))

where "filteredLatestOrder" is a DataStream containing informations about
the elements, the new state and the old state.

This produces in output:

2> Map(DRAFT -> 0, RESERVED -> 1, PICKED -> 0, PACKED -> 0)
2> Map(DRAFT -> 0, RESERVED -> 2, PICKED -> 0, PACKED -> 0)
2> Map(DRAFT -> 0, RESERVED -> 3, PICKED -> 0, PACKED -> 1)
3> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 1, PACKED -> 0)
4> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 0, PACKED -> 1)

I thought that keying with a fixed value would collect all the elements in
a single node so that I could finally compute the final result, but they
are left on different nodes and are never summed.

Is this the correct approach? In that case, how can I do what I need? Is
there a smarter way to count distinct evolving elements by their status in
a streaming? Mind that the original source of events are updates to the
status of an element and the requirement is that I want to count only the
latest status available.

Thank you in advance,

Simone


Discard message LeaderSessionMessage(null,ConnectionTimeout)

2016-09-13 Thread Simone Robutti
Hello,

while running a job on Flink 1.1.2 on a cluster of 3 nodes using the
KafkaProducer010, I encounter this error:

WARN  org.apache.flink.runtime.client.JobClientActor-
Discard message LeaderSessionMessage(null,ConnectionTimeout) because the
expected leader session ID 4a1c16fe-d015-4351-81ea-814796e2167f did not
equal the received leader session ID null.

The KafkaProducer010 works locally but on the cluster fails to read from
the topic. I can correctly read and write using console producer and
consumer so I don't think it's misconfigured.

I would like to know what could be the cause of the error, if it is related
to the issue I'm facing on the flink-kafka connector and maybe if it's due
to the upgrade to the version 0.10. I know the producer is still in a PR
but it showed to work properly in a local environment.

Thank you,

Simone


Re: Storing JPMML-Model Object as a Variable Closure?

2016-09-05 Thread Simone Robutti
>The only drawback is that the Evaluator gets initialized once per
Function-Instance.

I believe that reducing the number of functions instances is something that
should be handled by Flink's runtime and that's why I've implemented the
solution this way. In our tests the number of instances was minimal but
this is still extremely experimental so take it with a grain of salt.

I believe that this is highly dependent on the expected size of the PMML
models though.

2016-09-05 16:33 GMT+02:00 Bauss, Julian <julian.ba...@bonprix.net>:

> Hi Simone,
>
>
>
> thank you for your feedback!
>
> The code snippet you provided works fine.
>
>
>
> The only drawback is that the Evaluator gets initialized once per
> Function-Instance.
>
> That could lead to high memory consumption depending on the level of
> parallelism
>
> and the size of the PMML-Model (which can get quite big).
>
>
>
> The „obvious“ optimization would be to initialize and hide the Evaluator
> behind a singleton since it
>
> is thread safe. (Which is what I wanted to avoid in the first place. But
> maybe that is the best solution
>
> at the moment?)
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
>
> *Von:* Simone Robutti [mailto:simone.robu...@radicalbit.io]
> *Gesendet:* Montag, 5. September 2016 15:42
>
> *An:* user@flink.apache.org
> *Betreff:* Re: Storing JPMML-Model Object as a Variable Closure?
>
>
>
> Yes, sorry but it's private and I just discovered we don't want to release
> it as public yet. This piece of code could help you though:
> https://gist.github.com/chobeat/f07221357a2e3f9efa377e4cb0479f92
>
>
>
> Ignore all the stuff about the strategies. The important part is the
> `open` method and the transient var. This is used to load the PMML file and
> instance all the JPMML stuff when you instance the Flink operator. The
> variable `pmmlSource` is a string but you can replace that part with a load
> from HDFS or other FS if you want every node to load the .jpmml file in
> parallel and be in control of that part.
>
>
>
>
>
>
>
> 2016-09-05 15:24 GMT+02:00 Bauss, Julian <julian.ba...@bonprix.net>:
>
> Hi Simone,
>
>
>
> that sounds promising!
>
> Unfortunately your link leads to a 404 page.
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
>
> *Von:* Simone Robutti [mailto:simone.robu...@radicalbit.io]
> *Gesendet:* Montag, 5. September 2016 14:59
>
>
> *An:* user@flink.apache.org
> *Betreff:* Re: Storing JPMML-Model Object as a Variable Closure?
>
>
>
> I think you could make use of this small component I've developed:
> https://gitlab.com/chobeat/Flink-JPMML
>
>
>
> It's specifically for using JPMML on Flink. Maybe there's too much stuff
> for what you need but you could reuse the code of the Operator to do what
> you need.
>
>
>
> 2016-09-05 14:11 GMT+02:00 Bauss, Julian <julian.ba...@bonprix.net>:
>
> Hi Stephan,
>
>
>
> thanks for your reply!
>
> It seems as if I can only use broadcast variables on DataSet-Operators
> (using myFunc.withBroadcastSet(…))
>
> Is that right?
>
>
>
> I am working on a DataStream, though. Do streams offer similiar
> functionality?
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
>
> *Von:* Stephan Ewen [mailto:se...@apache.org]
> *Gesendet:* Freitag, 2. September 2016 15:27
> *An:* user@flink.apache.org
> *Betreff:* Re: Storing JPMML-Model Object as a Variable Closure?
>
>
>
> How about using a source and broadcast variable?
>
>
>
> You could write the model to the storage (DFS), the read it with a source
> and use a broadcast variable to send it to all tasks.
>
> A single record can be very large, so it should work even if your model is
> quite big.
>
>
>
> Does that sound feasible?
>
>
>
> In future versions of flink, you may be able to skip the "write to DFS"
> step and simply have the model in a collection source (when large RPC
> messages are supported).
>
>
>
> Best,
>
> Stephan
>
>
>
>
>
>
>
> On Fri, Sep 2, 2016 at 11:20 AM, Bauss, Julian <julian.ba...@bonprix.net>
> wrote:
>
> Hello Everybody,
>
>
>
> I’m currently refactoring some code and am looking for a better
> alternative to handle
>
> JPMML-Models in data streams. At the moment the flink job I’m working on
> references a model-object
>
> as a Singleton which I want to change because static references tend to
> cause problems in distributed systems.
>
>
>
> I thought about handing the model-object to the function that uses it as a
> variable closure. The 

Re: Storing JPMML-Model Object as a Variable Closure?

2016-09-05 Thread Simone Robutti
Yes, sorry but it's private and I just discovered we don't want to release
it as public yet. This piece of code could help you though:
https://gist.github.com/chobeat/f07221357a2e3f9efa377e4cb0479f92

Ignore all the stuff about the strategies. The important part is the `open`
method and the transient var. This is used to load the PMML file and
instance all the JPMML stuff when you instance the Flink operator. The
variable `pmmlSource` is a string but you can replace that part with a load
from HDFS or other FS if you want every node to load the .jpmml file in
parallel and be in control of that part.



2016-09-05 15:24 GMT+02:00 Bauss, Julian <julian.ba...@bonprix.net>:

> Hi Simone,
>
>
>
> that sounds promising!
>
> Unfortunately your link leads to a 404 page.
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
>
> *Von:* Simone Robutti [mailto:simone.robu...@radicalbit.io]
> *Gesendet:* Montag, 5. September 2016 14:59
>
> *An:* user@flink.apache.org
> *Betreff:* Re: Storing JPMML-Model Object as a Variable Closure?
>
>
>
> I think you could make use of this small component I've developed:
> https://gitlab.com/chobeat/Flink-JPMML
>
>
>
> It's specifically for using JPMML on Flink. Maybe there's too much stuff
> for what you need but you could reuse the code of the Operator to do what
> you need.
>
>
>
> 2016-09-05 14:11 GMT+02:00 Bauss, Julian <julian.ba...@bonprix.net>:
>
> Hi Stephan,
>
>
>
> thanks for your reply!
>
> It seems as if I can only use broadcast variables on DataSet-Operators
> (using myFunc.withBroadcastSet(…))
>
> Is that right?
>
>
>
> I am working on a DataStream, though. Do streams offer similiar
> functionality?
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
>
> *Von:* Stephan Ewen [mailto:se...@apache.org]
> *Gesendet:* Freitag, 2. September 2016 15:27
> *An:* user@flink.apache.org
> *Betreff:* Re: Storing JPMML-Model Object as a Variable Closure?
>
>
>
> How about using a source and broadcast variable?
>
>
>
> You could write the model to the storage (DFS), the read it with a source
> and use a broadcast variable to send it to all tasks.
>
> A single record can be very large, so it should work even if your model is
> quite big.
>
>
>
> Does that sound feasible?
>
>
>
> In future versions of flink, you may be able to skip the "write to DFS"
> step and simply have the model in a collection source (when large RPC
> messages are supported).
>
>
>
> Best,
>
> Stephan
>
>
>
>
>
>
>
> On Fri, Sep 2, 2016 at 11:20 AM, Bauss, Julian <julian.ba...@bonprix.net>
> wrote:
>
> Hello Everybody,
>
>
>
> I’m currently refactoring some code and am looking for a better
> alternative to handle
>
> JPMML-Models in data streams. At the moment the flink job I’m working on
> references a model-object
>
> as a Singleton which I want to change because static references tend to
> cause problems in distributed systems.
>
>
>
> I thought about handing the model-object to the function that uses it as a
> variable closure. The object
>
> can be between 16MB and 250MB in size (depending on the depth of the
> decision tree).
>
>
>
> According to https://cwiki.apache.org/confluence/display/FLINK/
> Variables+Closures+vs.+Broadcast+Variables that’s way too large though.
>
> Are there any viable alternatives or would this be the „right way“ to
> handle this situation?
>
>
>
> Best Regards,
>
>
>
> Julian
>
>
> 
> **
>
> bonprix Handelsgesellschaft mbH
> Sitz der Gesellschaft: Hamburg
>
> Geschäftsführung:
> Dr. Marcus Ackermann (Vorsitzender)
> Dr. Kai Heck
> Rien Jansen
> Markus Fuchshofen
> Beiratsvorsitzender: Alexander Birken
>
> Handelsregister AG Hamburg HR B 36 455
>
> Adresse:
>
> bonprix Handelsgesellschaft mbH
>
> Haldesdorfer Str. 61
> 22179 Hamburg
>
> Diese E-Mail enthält vertrauliche und/oder rechtlich geschützte
> Informationen.
> Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich
> erhalten haben,
> informieren Sie bitte sofort den Absender und vernichten Sie diese Mail.
> Das unerlaubte Kopieren sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet.
>
> This e-mail may contain confidential and/or privileged information.
> If you are not the intended recipient (or have received the e-mail in
> error)
> please notify the sender immediately and delete this e-mail. Any
> unauthorized copying,
> disclosure or distribution

Re: env.fromElements produces TypeInformation error

2016-06-04 Thread Simone Robutti
I'm not sure if this is the solution and I don't have the possibility to
try right now, but you should move the case class "State" definition
outside the abstract class.

2016-06-04 17:34 GMT+02:00 Dan Drewes :

>
> Hi,
>
> compiling the code:
>
> def minimize(f: DF, init: T): T = {
>
>   //create execution environment  val env = 
> ExecutionEnvironment.getExecutionEnvironment
>
>   val initialstate = initialState(f, init)
>   val iterativestate= env.fromElements(initialstate).iterate(1) {
> iterationInput: DataSet[State] =>
>   val result = iterationInput.map {
> oldstate => computeNextState(adjustedFun, oldstate)
>   }
>   result
>   }
> }
>
> object IterationsFirstOrderMinimizer {  case class State[+T, 
> +ConvergenceInfo, +History] (x: T,
>value: Double, grad: T,
>adjustedValue: Double, 
> adjustedGradient: T,
>iter: Int,
>initialAdjVal: Double,
>history: History,
>convergenceInfo: 
> ConvergenceInfo,
>searchFailed: Boolean = 
> false) {
>   }
>
> ... fails with the error: could not find implicit value for evidence
> parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[IterationsFirstOrderMinimizer.this.State]
> val iterativestate= env.fromElements(initialState(f,
> init)).iterate(1) { ^ Google
> only told me to import org.apache.flink.api.scala._ which i do. The
> other suggested solution for generic methods (
> https://ci.apache.org/projects/flink/flink-docs-master/internals/types_serialization.html)
> does also not apply because it's the fromElements call that produces the
> error. I am very new to Flink and Scala and because I don't know if the
> code I posted above is enough to say what I'm doing wrong, you can find the
> complete scala file attached. Thanks in advance for any ideas and hints.
> Best, Dan
>
>
> 
>  Virenfrei.
> www.avast.com
> 
>


Re: sparse matrix

2016-05-30 Thread Simone Robutti
Hello,

right now Flink's local matrices are rather raw and for this kind of usage,
you should rely on Breeze. If you need to perform operations, slicing in
this case, they are a better option if you don't want to reimplement
everything.

In case you already developed against Flink's matrices, there's a useful
converter in org.apache.flink.ml.math.Breeze.

2016-05-30 4:17 GMT+02:00 Lydia Ickler :

> Hi all,
>
> I have two questions regarding sparse matrices:
>
> 1. I have a sparse Matrix: val sparseMatrix = SparseMatrix.fromCOO(row,
> col, csvInput.collect())
> and now I would like to extract all values that are in a specific row X.
> How would I tackle that? flatMap() and filter() do not seem to be
> supported in that case.
>
> 2. I want to drop/delete one specific row and column from the matrix and
> therefore also reduce the dimension.
> How is the smartest way to do so?
>
> Thanks in advance!
> Lydia
>


Re: Merging sets with common elements

2016-05-25 Thread Simone Robutti
@Till:

A more meaningful example would be the following: from
{{1{1,2}},{2,{2,3}},{3,{4,5},{4{1,27 the result should be
{1,2,3,27},{4,5} because the set #1,#2 and #4 have at least one element in
common.

If you see this as a graph where the elements of the sets are nodes and a
set express a full connection of all the elements of said set , you can see
the result as the connected components of the graph.


2016-05-25 11:42 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:

> Hi Simone,
>
> could you elaborate a little bit on the actual operation you want to
> perform. Given a data set {(1, {1,2}), (2, {2,3})} what's the result of
> your operation? Is the result { ({1,2}, {1,2,3}) } because the 2 is
> contained in both sets?
>
> Cheers,
> Till
>
> On Wed, May 25, 2016 at 10:22 AM, Simone Robutti <
> simone.robu...@radicalbit.io> wrote:
>
>> Hello,
>>
>> I'm implementing MinHash for reccomendation on Flink. I'm almost done but
>> I need an efficient way to merge sets of similar keys together (and later
>> join these sets of keys with more data).
>>
>> The actual data structure is of the form DataSet[(Int,Set[Int])] where
>> the left element of the tuple is an ID for the right element, that is a set
>> of keys. I want to merge these sets together only if they share at least
>> one element.
>>
>> I'm rather sure to have studied the efficient solution to this problem in
>> a local environment but I don't really know how to treat it in a
>> distributed environment. Any suggestion?
>>
>> Thanks,
>>
>> Simone
>>
>
>


Merging sets with common elements

2016-05-25 Thread Simone Robutti
Hello,

I'm implementing MinHash for reccomendation on Flink. I'm almost done but I
need an efficient way to merge sets of similar keys together (and later
join these sets of keys with more data).

The actual data structure is of the form DataSet[(Int,Set[Int])] where the
left element of the tuple is an ID for the right element, that is a set of
keys. I want to merge these sets together only if they share at least one
element.

I'm rather sure to have studied the efficient solution to this problem in a
local environment but I don't really know how to treat it in a distributed
environment. Any suggestion?

Thanks,

Simone


Re: Bug while using Table API

2016-05-12 Thread Simone Robutti
Ok, I tested it and it works on the same example. :)

2016-05-11 12:25 GMT+02:00 Vasiliki Kalavri <vasilikikala...@gmail.com>:

> Hi Simone,
>
> Fabian has pushed a fix for the streaming TableSources that removed the
> Calcite Stream rules [1].
> The reported error does not appear anymore with the current master. Could
> you please also give it a try and verify that it works for you?
>
> Thanks,
> -Vasia.
>
> [1]:
> https://github.com/apache/flink/commit/7ed07933d2dd3cf41948287dc8fd79dbef902311
>
> On 4 May 2016 at 17:33, Vasiliki Kalavri <vasilikikala...@gmail.com>
> wrote:
>
>> Thanks Simone! I've managed to reproduce the error. I'll try to figure
>> out what's wrong and I'll keep you updated.
>>
>> -Vasia.
>> On May 4, 2016 3:25 PM, "Simone Robutti" <simone.robu...@radicalbit.io>
>> wrote:
>>
>>> Here is the code:
>>>
>>> package org.example
>>>
>>> import org.apache.flink.api.scala._
>>> import org.apache.flink.api.table.TableEnvironment
>>>
>>> object Job {
>>>   def main(args: Array[String]) {
>>> // set up the execution environment
>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>>
>>>
>>> val input = env.fromElements(WC("hello", 1), WC("hello", 1),
>>> WC("ciao", 1))
>>> val expr = tEnv.fromDataSet(input)
>>> val result = expr
>>>   .groupBy("word")
>>>   .select("word , count.sum as count")
>>> tEnv.toDataSet[WC](result).print()
>>>
>>> env.execute("Flink Scala API Skeleton")
>>>   }
>>> }
>>>
>>> case class WC(word:String,count:Int)
>>>
>>>
>>>
>


Re: Using FlinkML algorithms in Streaming

2016-05-11 Thread Simone Robutti
Actually model portability and persistence is a serious limitation to
practical use of FlinkML in streaming. If you know what you're doing, you
can write a blunt serializer for your model, write it in a file and rebuild
the model stream-side with deserialized informations.

I tried it for an SVM model and there were no obstacles. It's ugly but it
works.

2016-05-11 16:18 GMT+02:00 Márton Balassi :

> Currently I am not aware of streaming learners support, you would need to
> implement that yourself at this point.
>
> As for streaming predictors for batch learners I have some preview code
> that you might like. [1]
>
> [1]
> https://github.com/streamline-eu/ML-Pipelines/blob/314e3d940f1f1ac7b762ba96067e13d806476f57/flink-libraries/flink-stream-ml/src/main/scala/org/apache/flink/stream/ml/examples/MLRExample.scala
>
>
>
> On Wed, May 11, 2016 at 3:52 PM, Piyush Shrivastava  > wrote:
>
>> Hi Márton,
>>
>> I want to train and get the residuals.
>>
>> Thanks and Regards,
>> Piyush Shrivastava 
>> [image: WeboGraffiti]
>> http://webograffiti.com
>>
>>
>> On Wednesday, 11 May 2016 7:19 PM, Márton Balassi <
>> balassi.mar...@gmail.com> wrote:
>>
>>
>> Hey Piyush,
>>
>> Would you like to train or predict on the streaming data?
>>
>> Best,
>>
>> Marton
>>
>> On Wed, May 11, 2016 at 3:44 PM, Piyush Shrivastava <
>> piyush...@yahoo.co.in> wrote:
>>
>> Hello all,
>>
>> I want to perform linear regression using FlinkML's
>> MultipleLinearRegression() function on streaming data.
>>
>> This function takes a DataSet as an input and I cannot create a DataSet
>> inside the MapFunction of a DataStream. How can I use this function on my
>> DataStream?
>>
>> Thanks and Regards,
>> Piyush Shrivastava 
>> [image: WeboGraffiti]
>> http://webograffiti.com
>>
>>
>>
>>
>>
>


Re: Using ML lib SVM with Java

2016-05-09 Thread Simone Robutti
To my knowledge FlinkML does not support an unified API and most things
must be used exclusively with Scala Datasets.

2016-05-09 13:31 GMT+02:00 Malte Schwarzer :

> Hi folks,
>
> I tried to get the FlinkML SVM running - but it didn't really work. The
> SVM.fit() method requires a DataSet parameter but there is no
> such class/interface in Flink Java. Or am I mixing something up with
> Scala? Also, I couldn't find a Flink ML example for Java (there is only
> Scala).
>
> Is there any guide how to use the Flink ML Lib with Java? Or is Java
> currently not yet available for the ML lib?
>
>
> Best regards
> Malte
>
>


Re: Creating a custom operator

2016-05-09 Thread Simone Robutti
nsidered to
> be a stable interface. So certain things might change in the next versions.
> With 1.0 we stabilized the DataSet API and I would rather put a new API on
> top of it than on the Table API.
> - Regarding the transformation in H2O structures and calling H2O
> operations, I think this might again be done in MapPartition operators. In
> general, MapPartition gives you a lot of freedom because it provides an
> iterator over all elements of a partition. So you can do things before the
> first and after the last element and group data as you like. You can use
> partitionByHash() or rebalace() to shuffle data and sortPartition to
> locally sort the data in a partition. Please note that MapPartition
> operators do not support chaining and come therefore with a certain
> serialization overhead. Whenever possible you should use a MapFunction or
> FlatMapFunction which are a bit more lightweight.
>
> Hope this helps,
> Fabian
>
>
> 2016-05-03 15:13 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>:
>
>> I'm not sure this is the right way to do it but we were exploring all the
>> possibilities and this one is the more obvious. We also spent some time to
>> study how to do it to achieve a better understanding of Flink's internals.
>>
>> What we want to do though is to integrate Flink with another distributed
>> system that builds its own nodes and coordinates through the network with
>> its own logic. This software is H2O (a Machine Learning platform) and the
>> integration consists of two big tasks: the first is to instantiate a H2O's
>> node in every task manager and handle the lifecycle of the node according
>> to the taskmanager and the execution graph. The second is to allow the
>> developer to code everything inside Flink, converting from and to H2O's
>> data structures (distributed tabular data) and triggering the execution of
>> algorithms on H2O with a uniform API.
>>
>> Here's a simple example (assuming that we will use the TableAPI):
>>
>> val env = ExecutionEnvironment.getExecutionEnvironment
>> val h2oEnv = H2OEnviroment.getEnvironment(env)
>>
>> val myData: Table = ...
>> val someOtherData: Table = ...
>>
>> val myH2OFrame = myData.select(...).toH2OFrame(h2oEnv)
>>
>> val linearRegressionModel = h2oEnv.linearRegression(myH2OFrame)
>>
>> val predictions:Table=linearRegressionModel(someOtherData)
>>
>> predictions.select(...)
>>
>>
>> A good solution should allow the system to keep the H2O's nodes alive
>> through multiple tasks and the possibility to move the data locally from
>> Flink to H2O. The latter is not achieved in H2O's integration with Spark
>> but we still hope to do it.
>>
>> That said, I'm still not sure if it is really required to implement a
>> custom runtime operator but given the complexity of the integration of two
>> distribute systems, we assumed that more control would allow more
>> flexibility and possibilities to achieve an ideal solution.
>>
>>
>>
>>
>>
>> 2016-05-03 13:29 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>
>>> Hi Simone,
>>>
>>> you are right, the interfaces you extend are not considered to be
>>> public, user-facing API.
>>> Adding custom operators to the DataSet API touches many parts of the
>>> system and is not straightforward.
>>> The DataStream API has better support for custom operators.
>>>
>>> Can you explain what kind of operator you would like to add?
>>> Maybe the functionality can be achieved with the existing operators.
>>>
>>> Best, Fabian
>>>
>>> 2016-05-03 12:54 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>
>>> :
>>>
>>>> Hello Fabian,
>>>>
>>>> we delved more moving from the input you gave us but a question arised.
>>>> We always assumed that runtime operators were open for extension without
>>>> modifying anything inside Flink but it looks like this is not the case and
>>>> the documentation assumes that the developer is working to a contribution
>>>> to Flink. So I would like to know if our understandment is correct and
>>>> custom runtime operators are not supposed to be implemented outside of
>>>> Flink.
>>>>
>>>> Thanks,
>>>>
>>>> Simone
>>>>
>>>> 2016-04-29 21:32 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>>>
>>>>> Hi Simone,
>>>>>
>>>>> the GraphCreatingVisitor transforms the common ope

Re: Bug while using Table API

2016-05-04 Thread Simone Robutti
Here is the code:

package org.example

import org.apache.flink.api.scala._
import org.apache.flink.api.table.TableEnvironment

object Job {
  def main(args: Array[String]) {
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)


val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao",
1))
val expr = tEnv.fromDataSet(input)
val result = expr
  .groupBy("word")
  .select("word , count.sum as count")
tEnv.toDataSet[WC](result).print()

env.execute("Flink Scala API Skeleton")
  }
}

case class WC(word:String,count:Int)


Re: Creating a custom operator

2016-05-03 Thread Simone Robutti
I'm not sure this is the right way to do it but we were exploring all the
possibilities and this one is the more obvious. We also spent some time to
study how to do it to achieve a better understanding of Flink's internals.

What we want to do though is to integrate Flink with another distributed
system that builds its own nodes and coordinates through the network with
its own logic. This software is H2O (a Machine Learning platform) and the
integration consists of two big tasks: the first is to instantiate a H2O's
node in every task manager and handle the lifecycle of the node according
to the taskmanager and the execution graph. The second is to allow the
developer to code everything inside Flink, converting from and to H2O's
data structures (distributed tabular data) and triggering the execution of
algorithms on H2O with a uniform API.

Here's a simple example (assuming that we will use the TableAPI):

val env = ExecutionEnvironment.getExecutionEnvironment
val h2oEnv = H2OEnviroment.getEnvironment(env)

val myData: Table = ...
val someOtherData: Table = ...

val myH2OFrame = myData.select(...).toH2OFrame(h2oEnv)

val linearRegressionModel = h2oEnv.linearRegression(myH2OFrame)

val predictions:Table=linearRegressionModel(someOtherData)

predictions.select(...)


A good solution should allow the system to keep the H2O's nodes alive
through multiple tasks and the possibility to move the data locally from
Flink to H2O. The latter is not achieved in H2O's integration with Spark
but we still hope to do it.

That said, I'm still not sure if it is really required to implement a
custom runtime operator but given the complexity of the integration of two
distribute systems, we assumed that more control would allow more
flexibility and possibilities to achieve an ideal solution.





2016-05-03 13:29 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Simone,
>
> you are right, the interfaces you extend are not considered to be public,
> user-facing API.
> Adding custom operators to the DataSet API touches many parts of the
> system and is not straightforward.
> The DataStream API has better support for custom operators.
>
> Can you explain what kind of operator you would like to add?
> Maybe the functionality can be achieved with the existing operators.
>
> Best, Fabian
>
> 2016-05-03 12:54 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>:
>
>> Hello Fabian,
>>
>> we delved more moving from the input you gave us but a question arised.
>> We always assumed that runtime operators were open for extension without
>> modifying anything inside Flink but it looks like this is not the case and
>> the documentation assumes that the developer is working to a contribution
>> to Flink. So I would like to know if our understandment is correct and
>> custom runtime operators are not supposed to be implemented outside of
>> Flink.
>>
>> Thanks,
>>
>> Simone
>>
>> 2016-04-29 21:32 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>
>>> Hi Simone,
>>>
>>> the GraphCreatingVisitor transforms the common operator plan into a
>>> representation that is translated by the optimizer.
>>> You have to implement an OptimizerNode and OperatorDescriptor to
>>> describe the operator.
>>> Depending on the semantics of the operator, there are a few more places
>>> to make the integration working like driver strategies, cost model, etc.
>>>
>>> I would recommend to have a look at previous changes that added an
>>> operator such as PartitionOperator, SortPartitionOperator, OuterJoin, etc.
>>> The respective commits should give you an idea which parts of the code
>>> need to be touched. You should find the commit IDs in the JIRA issues for
>>> these extensions.
>>>
>>> Cheers, Fabian
>>>
>>>
>>>
>>>
>>>
>>> 2016-04-29 15:32 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>
>>> :
>>>
>>>> Hello,
>>>>
>>>> I'm trying to create a custom operator to explore the internals of
>>>> Flink. Actually the one I'm working on is rather similar to Union and I'm
>>>> trying to mimick it for now. When I run my job though, this error arise:
>>>>
>>>> Exception in thread "main" java.lang.IllegalArgumentException: Unknown
>>>> operator type: MyOperator - My Operator
>>>> at
>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237)
>>>> at
>>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatin

Re: Creating a custom operator

2016-05-03 Thread Simone Robutti
Hello Fabian,

we delved more moving from the input you gave us but a question arised. We
always assumed that runtime operators were open for extension without
modifying anything inside Flink but it looks like this is not the case and
the documentation assumes that the developer is working to a contribution
to Flink. So I would like to know if our understandment is correct and
custom runtime operators are not supposed to be implemented outside of
Flink.

Thanks,

Simone

2016-04-29 21:32 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Simone,
>
> the GraphCreatingVisitor transforms the common operator plan into a
> representation that is translated by the optimizer.
> You have to implement an OptimizerNode and OperatorDescriptor to describe
> the operator.
> Depending on the semantics of the operator, there are a few more places to
> make the integration working like driver strategies, cost model, etc.
>
> I would recommend to have a look at previous changes that added an
> operator such as PartitionOperator, SortPartitionOperator, OuterJoin, etc.
> The respective commits should give you an idea which parts of the code
> need to be touched. You should find the commit IDs in the JIRA issues for
> these extensions.
>
> Cheers, Fabian
>
>
>
>
>
> 2016-04-29 15:32 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>:
>
>> Hello,
>>
>> I'm trying to create a custom operator to explore the internals of Flink.
>> Actually the one I'm working on is rather similar to Union and I'm trying
>> to mimick it for now. When I run my job though, this error arise:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Unknown
>> operator type: MyOperator - My Operator
>> at
>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237)
>> at
>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82)
>> at
>> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279)
>> at
>> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223)
>> at org.apache.flink.api.common.Plan.accept(Plan.java:348)
>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454)
>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>> at
>> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213)
>> at
>> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107)
>> at io.radicalbit.flinkh2o.Job$.main(Job.scala:50)
>> at io.radicalbit.flinkh2o.Job.main(Job.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>
>> I looked at the location of the error but it's not clear to me how to
>> make my operator recognizable from the optimizer.
>>
>> Thank,
>>
>> Simone
>>
>
>


Creating a custom operator

2016-04-29 Thread Simone Robutti
Hello,

I'm trying to create a custom operator to explore the internals of Flink.
Actually the one I'm working on is rather similar to Union and I'm trying
to mimick it for now. When I run my job though, this error arise:

Exception in thread "main" java.lang.IllegalArgumentException: Unknown
operator type: MyOperator - My Operator
at
org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237)
at
org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82)
at
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279)
at
org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223)
at org.apache.flink.api.common.Plan.accept(Plan.java:348)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454)
at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
at
org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213)
at
org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107)
at io.radicalbit.flinkh2o.Job$.main(Job.scala:50)
at io.radicalbit.flinkh2o.Job.main(Job.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

I looked at the location of the error but it's not clear to me how to make
my operator recognizable from the optimizer.

Thank,

Simone


Create a cluster inside Flink

2016-04-28 Thread Simone Robutti
Hello everyone,

I'm approaching a rather big and complex integration with an existing
software and I would like to hear the opinion of more experienced users on
how to tackle a few issues.

This software builds a cloud with its own logic. What I need is to keep
these nodes as instances inside the TaskManagers and use these instances to
perform operation with dedicated operators. I need to move tabular data
back and forth from and to Flink's Datasets and be able to call methods on
these instances.

I would like to receive suggestions on how to implement this behaviour.
First I thought about using Flink's actor system but I discovered it is not
accessible. So I would like to understand how to properly create these
instances (new thread inside a mapPartition?), how to call methods on them
(create a custom context?) and convert data from a Dataset or a Table to
the custom format of this software (this probably won't be much of a
problem, I will write wrappers or at worst replicate the data).

Any suggestion is welcome.

Thanks,

Simone


Explanation on limitations of the Flink Table API

2016-04-21 Thread Simone Robutti
Hello,

I would like to know if it's possible to create a Flink Table from an
arbitrary CSV (or any other form of tabular data) without doing type safe
parsing with expliciteky type classes/POJOs.

To my knowledge this is not possible but I would like to know if I'm
missing something. My requirement is to be able to read a CSV file and
manipulate it reading the field names from the file and inferring data
types.

Thanks,

Simone


How to test serializability of a Flink job

2016-04-05 Thread Simone Robutti
Hello,

last week I got a problem where my job worked in local mode but could not
be serialized on the cluster. I assume that local mode does not really
serialize all the operators (the problem was with a custom map function)
and I need to enforce this behaviour in local mode or, better, be able to
write tests that verify that a class or a job could be successfully
serialized.

Thanks,

Simone


Re: Flink ML 1.0.0 - Saving and Loading Models to Score a Single Feature Vector

2016-03-29 Thread Simone Robutti
To my knowledge there is nothing like that. PMML is not supported in any
form and there's no custom saving format yet. If you really need a quick
and dirty solution, it's not that hard to serialize the model into a file.

2016-03-28 17:59 GMT+02:00 Sourigna Phetsarath :

> Flinksters,
>
> Is there an example of saving a Trained Model, loading a Trained Model and
> then scoring one or more feature vectors using Flink ML?
>
> All of the examples I've seen have shown only sequential fit and predict.
>
> Thank you.
>
> -Gna
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * *
>


Connecting to a remote jobmanager - problem with Akka remote

2016-03-22 Thread Simone Robutti
Hello,

we are trying to set up our system to do remote debugging through Intellij.
Flink is running on a yarn long running session. We are launching Flink's
CliFrontend with the following parameters:

>  run -m **::48252
/Users//Projects/flink/build-target/examples/batch/WordCount.jar

The error raised is the following:

 ERROR akka.remote.EndpointWriter-
dropping message [class akka.actor.ActorSelectionMessage] for non-local
recipient [Actor[akka.tcp://flink@**:48252/]]
arriving at [akka.tcp://flink@**:48252] inbound
addresses are [akka.tcp://flink@**:48252]

The meaning of the error looks clear but we don't know how to fix it. Any
advice?


Flink Checkpoint on yarn

2016-03-19 Thread Simone Robutti
Hello,

I'm testing the checkpointing functionality with hdfs as a backend.

For what I can see it uses different checkpointing files and resume the
computation from different points and not from the latest available. This
is to me an unexpected behaviour.

I log every second, for every worker, a counter that is increased by 1 at
each step.

So for example on node-1 the count goes up to 5, then I kill a job manager
or task manager and it resumes from 5 or 4 and it's ok. The next time I
kill a job manager the count is at 15 and it resumes at 14 or 15. Sometimes
it may happen that at a third kill the work resumes at 4 or 5 as if the
checkpoint resumed the second time wasn't there.

Once I even saw it jump forward: the first kill is at 10 and it resumes at
9, the second kill is at 70 and it resumes at 9, the third kill is at 15
but it resumes at 69 as if it resumed from the second kill checkpoint.

This is clearly inconsistent.

Also, in the logs I can find that sometimes it uses a checkpoint file
different from the previous, consistent resume.

What am I doing wrong? Is it a known bug?


Re: Flink Checkpoint on yarn

2016-03-19 Thread Simone Robutti
This is the log filtered to check messages from
ZooKeeperCompletedCheckpointStore.

https://gist.github.com/chobeat/0222b31b87df3fa46a23

It looks like it finds only a checkpoint but I'm not sure if the different
hashes and IDs of the checkpoints are meaningful or not.



2016-03-16 15:33 GMT+01:00 Ufuk Celebi <u...@apache.org>:

> Can you please have a look into the JobManager log file and report
> which checkpoints are restored? You should see messages from
> ZooKeeperCompletedCheckpointStore like:
> - Found X checkpoints in ZooKeeper
> - Initialized with X. Removing all older checkpoints
>
> You can share the complete job manager log file as well if you like.
>
> – Ufuk
>
> On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
> <simone.robu...@radicalbit.io> wrote:
> > Hello,
> >
> > I'm testing the checkpointing functionality with hdfs as a backend.
> >
> > For what I can see it uses different checkpointing files and resume the
> > computation from different points and not from the latest available.
> This is
> > to me an unexpected behaviour.
> >
> > I log every second, for every worker, a counter that is increased by 1 at
> > each step.
> >
> > So for example on node-1 the count goes up to 5, then I kill a job
> manager
> > or task manager and it resumes from 5 or 4 and it's ok. The next time I
> kill
> > a job manager the count is at 15 and it resumes at 14 or 15. Sometimes it
> > may happen that at a third kill the work resumes at 4 or 5 as if the
> > checkpoint resumed the second time wasn't there.
> >
> > Once I even saw it jump forward: the first kill is at 10 and it resumes
> at
> > 9, the second kill is at 70 and it resumes at 9, the third kill is at 15
> but
> > it resumes at 69 as if it resumed from the second kill checkpoint.
> >
> > This is clearly inconsistent.
> >
> > Also, in the logs I can find that sometimes it uses a checkpoint file
> > different from the previous, consistent resume.
> >
> > What am I doing wrong? Is it a known bug?
>


Re: Flink Checkpoint on yarn

2016-03-19 Thread Simone Robutti
Actually the test was intended for a single job. The fact that there are
more jobs is unexpected and it will be the first thing to verify.
Considering these problems we will go for deeper tests with multiple jobs.

The logs are collected with "yarn logs" but log aggregation is not properly
configured so I wouldn't rely too much on that. Before doing the tests
tomorrow I will clear all the existing logs just to be sure.

2016-03-16 18:19 GMT+01:00 Ufuk Celebi <u...@apache.org>:

> OK, so you are submitting multiple jobs, but you submit them with -m
> yarn-cluster and therefore expect them to start separate YARN
> clusters. Makes sense and I would expect the same.
>
> I think that you can check in the client logs printed to stdout to
> which cluster the job is submitted.
>
> PS: The logs you have shared are out-of-order, how did you gather
> them? Do you have an idea why they are out of order? Maybe something
> is mixed up in the way we gather the logs and we only think that
> something is wrong because of this.
>
>
> On Wed, Mar 16, 2016 at 6:11 PM, Simone Robutti
> <simone.robu...@radicalbit.io> wrote:
> > I didn't resubmitted the job. Also the jobs are submitted one by one
> with -m
> > yarn-master, not with a long running yarn session so I don't really know
> if
> > they could mix up.
> >
> > I will repeat the test with a cleaned state because we saw that killing
> the
> > job with yarn application -kill left the "flink run" process alive so
> that
> > may be the problem. We just noticed a few minutes ago.
> >
> > If the problem persists, I will eventually come back with a full log.
> >
> > Thanks for now,
> >
> > Simone
> >
> > 2016-03-16 18:04 GMT+01:00 Ufuk Celebi <u...@apache.org>:
> >>
> >> Hey Simone,
> >>
> >> from the logs it looks like multiple jobs have been submitted to the
> >> cluster, not just one. The different files correspond to different
> >> jobs recovering. The filtered logs show three jobs running/recovering
> >> (with IDs 10d8ccae6e87ac56bf763caf4bc4742f,
> >> 124f29322f9026ac1b35435d5de9f625, 7f280b38065eaa6335f5c3de4fc82547).
> >>
> >> Did you manually re-submit the job after killing a job manager?
> >>
> >> Regarding the counts, it can happen that they are rolled back to a
> >> previous consistent state if the checkpoint was not completed yet
> >> (including the write to ZooKeeper). In that case the job state will be
> >> rolled back to an earlier consistent state.
> >>
> >> Can you please share the complete job manager logs of your program?
> >> The most helpful thing will be to have a log for each started job
> >> manager container. I don't know if that is easily possible.
> >>
> >> – Ufuk
> >>
> >> On Wed, Mar 16, 2016 at 4:12 PM, Simone Robutti
> >> <simone.robu...@radicalbit.io> wrote:
> >> > This is the log filtered to check messages from
> >> > ZooKeeperCompletedCheckpointStore.
> >> >
> >> > https://gist.github.com/chobeat/0222b31b87df3fa46a23
> >> >
> >> > It looks like it finds only a checkpoint but I'm not sure if the
> >> > different
> >> > hashes and IDs of the checkpoints are meaningful or not.
> >> >
> >> >
> >> >
> >> > 2016-03-16 15:33 GMT+01:00 Ufuk Celebi <u...@apache.org>:
> >> >>
> >> >> Can you please have a look into the JobManager log file and report
> >> >> which checkpoints are restored? You should see messages from
> >> >> ZooKeeperCompletedCheckpointStore like:
> >> >> - Found X checkpoints in ZooKeeper
> >> >> - Initialized with X. Removing all older checkpoints
> >> >>
> >> >> You can share the complete job manager log file as well if you like.
> >> >>
> >> >> – Ufuk
> >> >>
> >> >> On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
> >> >> <simone.robu...@radicalbit.io> wrote:
> >> >> > Hello,
> >> >> >
> >> >> > I'm testing the checkpointing functionality with hdfs as a backend.
> >> >> >
> >> >> > For what I can see it uses different checkpointing files and resume
> >> >> > the
> >> >> > computation from different points and not from the latest
> available.
> >> >> > This is
> >> >> > to me an unexpected behaviour.
> >> >>

Re: Flink Checkpoint on yarn

2016-03-19 Thread Simone Robutti
I didn't resubmitted the job. Also the jobs are submitted one by one with
-m yarn-master, not with a long running yarn session so I don't really know
if they could mix up.

I will repeat the test with a cleaned state because we saw that killing the
job with yarn application -kill left the "flink run" process alive so that
may be the problem. We just noticed a few minutes ago.

If the problem persists, I will eventually come back with a full log.

Thanks for now,

Simone

2016-03-16 18:04 GMT+01:00 Ufuk Celebi <u...@apache.org>:

> Hey Simone,
>
> from the logs it looks like multiple jobs have been submitted to the
> cluster, not just one. The different files correspond to different
> jobs recovering. The filtered logs show three jobs running/recovering
> (with IDs 10d8ccae6e87ac56bf763caf4bc4742f,
> 124f29322f9026ac1b35435d5de9f625, 7f280b38065eaa6335f5c3de4fc82547).
>
> Did you manually re-submit the job after killing a job manager?
>
> Regarding the counts, it can happen that they are rolled back to a
> previous consistent state if the checkpoint was not completed yet
> (including the write to ZooKeeper). In that case the job state will be
> rolled back to an earlier consistent state.
>
> Can you please share the complete job manager logs of your program?
> The most helpful thing will be to have a log for each started job
> manager container. I don't know if that is easily possible.
>
> – Ufuk
>
> On Wed, Mar 16, 2016 at 4:12 PM, Simone Robutti
> <simone.robu...@radicalbit.io> wrote:
> > This is the log filtered to check messages from
> > ZooKeeperCompletedCheckpointStore.
> >
> > https://gist.github.com/chobeat/0222b31b87df3fa46a23
> >
> > It looks like it finds only a checkpoint but I'm not sure if the
> different
> > hashes and IDs of the checkpoints are meaningful or not.
> >
> >
> >
> > 2016-03-16 15:33 GMT+01:00 Ufuk Celebi <u...@apache.org>:
> >>
> >> Can you please have a look into the JobManager log file and report
> >> which checkpoints are restored? You should see messages from
> >> ZooKeeperCompletedCheckpointStore like:
> >> - Found X checkpoints in ZooKeeper
> >> - Initialized with X. Removing all older checkpoints
> >>
> >> You can share the complete job manager log file as well if you like.
> >>
> >> – Ufuk
> >>
> >> On Wed, Mar 16, 2016 at 2:50 PM, Simone Robutti
> >> <simone.robu...@radicalbit.io> wrote:
> >> > Hello,
> >> >
> >> > I'm testing the checkpointing functionality with hdfs as a backend.
> >> >
> >> > For what I can see it uses different checkpointing files and resume
> the
> >> > computation from different points and not from the latest available.
> >> > This is
> >> > to me an unexpected behaviour.
> >> >
> >> > I log every second, for every worker, a counter that is increased by 1
> >> > at
> >> > each step.
> >> >
> >> > So for example on node-1 the count goes up to 5, then I kill a job
> >> > manager
> >> > or task manager and it resumes from 5 or 4 and it's ok. The next time
> I
> >> > kill
> >> > a job manager the count is at 15 and it resumes at 14 or 15. Sometimes
> >> > it
> >> > may happen that at a third kill the work resumes at 4 or 5 as if the
> >> > checkpoint resumed the second time wasn't there.
> >> >
> >> > Once I even saw it jump forward: the first kill is at 10 and it
> resumes
> >> > at
> >> > 9, the second kill is at 70 and it resumes at 9, the third kill is at
> 15
> >> > but
> >> > it resumes at 69 as if it resumed from the second kill checkpoint.
> >> >
> >> > This is clearly inconsistent.
> >> >
> >> > Also, in the logs I can find that sometimes it uses a checkpoint file
> >> > different from the previous, consistent resume.
> >> >
> >> > What am I doing wrong? Is it a known bug?
> >
> >
>


Re: Java Maps and Type Information

2016-03-02 Thread Simone Robutti
Ok, I made it work but there's still an issue. I used
.returns(java.util.Map.class) after the "map" call and it works with this
simple function but it doesn't compile with my CustomMapFunction that
extends MapFunction. It gives a compilation error on the .returns() call.

This is the case only if the variable operator is of type CustomMapFunction
but if I do

> MapFunction operator = new CustomMapFunction();

it works again.

If I go back to

> CustomMapFunction operator = new CustomMapFunction();

it gives this error:

>Error:(43, 87) java: no suitable method found for
returns(java.lang.Class)
method

Should I open an issue?

2016-03-01 21:45 GMT+01:00 Simone Robutti <simone.robu...@radicalbit.io>:

> I tried to simplify it to the bones but I'm actually defining a custom
> MapFunction<java.util.Map<String,Object>,java.util.Map<String,Object>> that
> even with a simple identity function fails at runtime giving me the
> following error:
>
> >Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: The return
> type of function 'main(Main.java:45)' could not be determined
> automatically, due to type erasure. You can give type information hints by
> using the returns(...) method on the result of the transformation call, or
> by letting your function implement the 'ResultTypeQueryable' interface.
>
> where the line 45 is the line where I invoke the map function.
>
> Here the piece of code:
>
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> Map<String,Object> inputMap = new HashMap<String,Object>();
> inputMap.put("sepal_width",2.0);
> inputMap.put("sepal_length",2.0);
> inputMap.put("petal_width",2.0);
> inputMap.put("petal_length",2.0);
>
> MapFunction operator=new
> MapFunction<Map<String,Object>,Map<String,Object>>(){
>
> @Override
> public Map<String, Object> map(Map<String, Object>
> stringObjectMap) throws Exception {
> return stringObjectMap;
> }
> };
>
> List<Map<String, Object>> input = new LinkedList<>();
> input.add(inputMap);
> DataSource<Map<String, Object>> dataset =
> env.fromCollection(input);
> List<java.util.Map<FieldName, Object>> collectedResult =
> dataset.map(operator).collect();
>
>
>
>
> 2016-03-01 16:42 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
>
>> Hi,
>> what kind of program are you writing? I just wrote a quick example using
>> the DataStream API where I’m using Map<String, Tuple2<String, Integer>> as
>> the output type of one of my MapFunctions.
>>
>> Cheers,
>> Aljoscha
>> > On 01 Mar 2016, at 16:33, Simone Robutti <simone.robu...@radicalbit.io>
>> wrote:
>> >
>> > Hello,
>> >
>> > to my knowledge is not possible to use a java.util.Map for example in a
>> FlatMapFunction<java.util.Map, java.util.Map>. Is that correct? It gives a
>> typer error at runtime and it doesn't work even with explicit
>> TypeInformation hints.
>> >
>> > Is there any way to make it work?
>> >
>> > Thanks,
>> >
>> > Simone
>>
>>
>


Re: Java Maps and Type Information

2016-03-01 Thread Simone Robutti
I tried to simplify it to the bones but I'm actually defining a custom
MapFunction<java.util.Map<String,Object>,java.util.Map<String,Object>> that
even with a simple identity function fails at runtime giving me the
following error:

>Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: The return
type of function 'main(Main.java:45)' could not be determined
automatically, due to type erasure. You can give type information hints by
using the returns(...) method on the result of the transformation call, or
by letting your function implement the 'ResultTypeQueryable' interface.

where the line 45 is the line where I invoke the map function.

Here the piece of code:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
Map<String,Object> inputMap = new HashMap<String,Object>();
inputMap.put("sepal_width",2.0);
inputMap.put("sepal_length",2.0);
inputMap.put("petal_width",2.0);
inputMap.put("petal_length",2.0);

MapFunction operator=new
MapFunction<Map<String,Object>,Map<String,Object>>(){

@Override
public Map<String, Object> map(Map<String, Object>
stringObjectMap) throws Exception {
return stringObjectMap;
}
};

List<Map<String, Object>> input = new LinkedList<>();
input.add(inputMap);
DataSource<Map<String, Object>> dataset = env.fromCollection(input);
List<java.util.Map<FieldName, Object>> collectedResult =
dataset.map(operator).collect();




2016-03-01 16:42 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:

> Hi,
> what kind of program are you writing? I just wrote a quick example using
> the DataStream API where I’m using Map<String, Tuple2<String, Integer>> as
> the output type of one of my MapFunctions.
>
> Cheers,
> Aljoscha
> > On 01 Mar 2016, at 16:33, Simone Robutti <simone.robu...@radicalbit.io>
> wrote:
> >
> > Hello,
> >
> > to my knowledge is not possible to use a java.util.Map for example in a
> FlatMapFunction<java.util.Map, java.util.Map>. Is that correct? It gives a
> typer error at runtime and it doesn't work even with explicit
> TypeInformation hints.
> >
> > Is there any way to make it work?
> >
> > Thanks,
> >
> > Simone
>
>


Compilation error while instancing FlinkKafkaConsumer082

2016-02-10 Thread Simone Robutti
Hello,

the compiler has been raising an error since I added this line to the code

val testData=streamEnv.addSource(new
FlinkKafkaConsumer082[String]("data-input",new
SimpleStringSchema(),kafkaProp))

Here is the error:

Error:scalac: Class
org.apache.flink.streaming.api.checkpoint.CheckpointNotifier not found -
continuing with a stub.
Error:scalac: Class
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
not found - continuing with a stub.
Warning:scalac: Class
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
not found - continuing with a stub.
Warning:scalac: Class
org.apache.flink.streaming.api.checkpoint.CheckpointNotifier not found -
continuing with a stub.

I've been using the 1.0-Snapshot version built with scala 2.11. Any
suggestion?