Re: Multiple operations on a WindowedStream

2016-04-08 Thread Aljoscha Krettek
Hi,
the sources don't report records consumed. This is a bit confusing but the
records sent/records consumed statistics only talk about Flink-internal
sending of records, so a Kafka source would only show sent records.

To really see each operator in isolation you should disable chaining for
these tests:

env.disableOperatorChaining()

Cheers,
Aljoscha

On Sat, 9 Apr 2016 at 05:12 Kanak Biscuitwala  wrote:

> It turns out that the problem is deeper than I originally thought. The
> flink dashboard reports that 0 records are being consumed, which is quite
> odd. Is there some issue with the 0.9 consumer on YARN? From:
> aljos...@apache.org Date: Thu, 7 Apr 2016 09:56:42 + Subject: Re:
> Multiple operations on a WindowedStream To: user@flink.apache.org Hi, the
> code seems alright? Did you try looking at the Flink Dashboard to check out
> whether any of the operations are sending elements? Cheers, Aljoscha On
> Tue, 5 Apr 2016 at 21:00 Kanak Biscuitwala wrote: This worked when I ran
> my test code locally, but I'm seeing nothing reach my sink when I try to
> run this in YARN (previously, when I just echo'ed all sums to my sink, it
> would work). Here's what my code looks like:
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>(
>   INPUT_TOPIC, new KafkaMessageDeserializer(), properties);
> env.enableCheckpointing(5000); // this (or event time) is required
> in order to do the double-windowing below
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> DataStream stream = env .addSource(consumer)
> .flatMap(new CountRequests()) .keyBy(0, 1)
> .timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, TimeUnit.SECONDS))
> .sum(2) .timeWindowAll(Time.of(5,
> TimeUnit.SECONDS)) .apply(new TopK(20))
> .map(new ToString>>()); stream.addSink(new
> FlinkKafkaProducer09<>(OUTPUT_TOPIC, new SimpleStringSchema(),
> properties)); env.execute(TASK_NAME); Note that CountRequests
> produces Tuple3, TopK is an AllWindowFunction that produces List>, and
> ToString is a MapFunction that is just a wrapper on Object#toString().
> Anything obvious that I'm doing wrong?  >
> From: aljos...@apache.org > Date: Fri, 1 Apr 2016 09:41:12 + >
> Subject: Re: Multiple operations on a WindowedStream > To:
> user@flink.apache.org > > Hi, > if you are using ingestion-time (or
> event-time) as your stream time > characteristic, i.e.: > >
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or >
> TimeCharacteristic.EventTime > > you can apply several window transforms
> after another and they will > apply the same "time window" because they
> work on the element > timestamps. What you can then do is have a window
> that does the > aggregation and then another one (that has to be global) to
> select the > top elements: > > result = input > .keyBy() >
> .timeWindow(Time.minutes(1), Time.seconds(5)) > .sum(2) >
> .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding > window
> here, because the above will output a new window every 5 seconds > .apply()
> > > I hope this helps. > > Cheers, > Aljoscha > > On Fri, 1 Apr 2016 at
> 10:35 Balaji Rajagopalan > > > wrote: > I had a similar use case and
> ended writing the aggregation logic in the > apply function, could not find
> any better solution. > > On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala
> > > wrote: > Hi, > > I would like to write something that does something
> like a word count, > and then emits only the 10 highest counts for that
> window. Logically, I > would want to do something like: > >
> stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, >
> TimeUnit.SECONDS)).sum(2).apply(getTopK(10)) > > However, the window
> context is lost after I do the sum aggregation. Is > there a
> straightforward way to express this logic in Flink 1.0? One way > I can
> think of is to have a complex function in apply() that has state, > but I
> would like to know if there is something a little cleaner than > that. > >
> Thanks, > Kanak >
>


Re: Access an event's TimeWindow?

2016-04-08 Thread Aljoscha Krettek
Hi,
for cases like these there is a family of "apply" methods on WindowedStream
that also take an incremental aggregation function. For example, there is

.apply(ReduceFunction, WindowFunction)

What this will do is incrementally aggregate the contents of the window.
When the window result should be emitted the single aggregated value is
passed to the WindowFunction which also gets meta information, such as the
TimeWindow. There you can emit the window result along with the meta
information.

Cheers,
Aljoscha

On Sat, 9 Apr 2016 at 03:53 Elias Levy  wrote:

> Is there an API to access an event's time window?  When you are computing
> aggregates over a time window, you usually want to output the window along
> with the aggregate.  I could compute the Window on my own, but this seems
> like an API that should exist.
>


Re: WindowedStream sum behavior

2016-04-08 Thread Aljoscha Krettek
Hi,
there no guarantees for the fields other than the summed field (and
eventual key fields). I think in practice it's either the fields from the
first or last record but I wouldn't rely on that.

Cheers,
Aljoscha

On Sat, 9 Apr 2016 at 03:19 Elias Levy  wrote:

> I am wondering about the expected behavior of the sum method.  Obviously
> it sums a specific field in a tuple or POJO. But what should one expect in
> other fields?  Does sum keep the first field, last field or there aren't
> any guarantees?
>


RE: Multiple operations on a WindowedStream

2016-04-08 Thread Kanak Biscuitwala
It turns out that the problem is deeper than I originally thought. The flink 
dashboard reports that 0 records are being consumed, which is quite odd. Is 
there some issue with the 0.9 consumer on YARN?

From: aljos...@apache.org
Date: Thu, 7 Apr 2016 09:56:42 +
Subject: Re: Multiple operations on a WindowedStream
To: user@flink.apache.org

Hi,
the code seems alright? Did you try looking at the Flink Dashboard to check out 
whether any of the operations are sending elements?

Cheers,
Aljoscha

On Tue, 5 Apr 2016 at 21:00 Kanak Biscuitwala  wrote:
This worked when I ran my test code locally, but I'm seeing nothing reach my 
sink when I try to run this in YARN (previously, when I just echo'ed all sums 
to my sink, it would work).

Here's what my code looks like:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>(
INPUT_TOPIC, new KafkaMessageDeserializer(), properties);
env.enableCheckpointing(5000);

// this (or event time) is required in order to do the double-windowing 
below
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

DataStream stream = env
.addSource(consumer)
.flatMap(new CountRequests())
.keyBy(0, 1)
.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5, 
TimeUnit.SECONDS))
.sum(2)
.timeWindowAll(Time.of(5, TimeUnit.SECONDS))
.apply(new TopK(20))
.map(new ToString());
stream.addSink(new FlinkKafkaProducer09<>(OUTPUT_TOPIC, new 
SimpleStringSchema(),
properties));
env.execute(TASK_NAME);

Note that CountRequests produces Tuple3, TopK is an AllWindowFunction that 
produces List, and ToString is a MapFunction that is just a wrapper on 
Object#toString().

Anything obvious that I'm doing wrong?

> From: aljos...@apache.org
> Date: Fri, 1 Apr 2016 09:41:12 +
> Subject: Re: Multiple operations on a WindowedStream
> To: user@flink.apache.org
>
> Hi,
> if you are using ingestion-time (or event-time) as your stream time
> characteristic, i.e.:
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // or
> TimeCharacteristic.EventTime
>
> you can apply several window transforms after another and they will
> apply the same "time window" because they work on the element
> timestamps. What you can then do is have a window that does the
> aggregation and then another one (that has to be global) to select the
> top elements:
>
> result = input
> .keyBy()
> .timeWindow(Time.minutes(1), Time.seconds(5))
> .sum(2)
> .timeWindowAll(Time.seconds(5)) // notice how I put a non-sliding
> window here, because the above will output a new window every 5 seconds
> .apply()
>
> I hope this helps.
>
> Cheers,
> Aljoscha
>
> On Fri, 1 Apr 2016 at 10:35 Balaji Rajagopalan
> 
> wrote:
> I had a similar use case and ended writing the aggregation logic in the
> apply function, could not find any better solution.
>
> On Fri, Apr 1, 2016 at 6:03 AM, Kanak Biscuitwala
>  wrote:
> Hi,
>
> I would like to write something that does something like a word count,
> and then emits only the 10 highest counts for that window. Logically, I
> would want to do something like:
>
> stream.timeWindow(Time.of(1, TimeUnit.MINUTES), Time.of(5,
> TimeUnit.SECONDS)).sum(2).apply(getTopK(10))
>
> However, the window context is lost after I do the sum aggregation. Is
> there a straightforward way to express this logic in Flink 1.0? One way
> I can think of is to have a complex function in apply() that has state,
> but I would like to know if there is something a little cleaner than
> that.
>
> Thanks,
> Kanak
>

  

Access an event's TimeWindow?

2016-04-08 Thread Elias Levy
Is there an API to access an event's time window?  When you are computing
aggregates over a time window, you usually want to output the window along
with the aggregate.  I could compute the Window on my own, but this seems
like an API that should exist.


WindowedStream sum behavior

2016-04-08 Thread Elias Levy
I am wondering about the expected behavior of the sum method.  Obviously it
sums a specific field in a tuple or POJO. But what should one expect in
other fields?  Does sum keep the first field, last field or there aren't
any guarantees?


Re: Flink ML 1.0.0 - Saving and Loading Models to Score a Single Feature Vector

2016-04-08 Thread Trevor Grant
I'm just about to open an issue / PR solution for 'warm-starts'

Once this is in, we could just add a setter for the weight vector (and what
ever iteration you're on if you're going to do more partial fits).

Then all you need to save if your weight vector (and iter number).



Trevor Grant
Data Scientist
https://github.com/rawkintrevo
http://stackexchange.com/users/3002022/rawkintrevo
http://trevorgrant.org

*"Fortunate is he, who is able to know the causes of things."  -Virgil*


On Fri, Apr 8, 2016 at 9:04 AM, Behrouz Derakhshan <
behrouz.derakhs...@gmail.com> wrote:

> Is there a reasons the Predictor or Estimator class don't have read and
> write methods for saving and retrieving the model? I couldn't find Jira
> issues for it. Does it make sense to create one ?
>
> BR,
> Behrouz
>
> On Wed, Mar 30, 2016 at 4:40 PM, Till Rohrmann 
> wrote:
>
>> Yes Suneel is completely wright. If the data does not implement
>> IOReadableWritable it is probably easier to use the
>> TypeSerializerOutputFormat. What you need here to seralize the data is a
>> TypeSerializer. You can obtain it the following way:
>>
>> val model = mlr.weightsOption.get
>>
>> val weightVectorTypeInfo = TypeInformation.of(classOf[WeightVector])
>> val weightVectorSerializer = weightVectorTypeInfo.createSerializer(new 
>> ExecutionConfig())
>> val outputFormat = new TypeSerializerOutputFormat[WeightVector]
>> outputFormat.setSerializer(weightVectorSerializer)
>>
>> model.write(outputFormat, "path")
>>
>> Cheers,
>> Till
>> ​
>>
>> On Tue, Mar 29, 2016 at 8:22 PM, Suneel Marthi 
>> wrote:
>>
>>> U may want to use FlinkMLTools.persist() methods which use
>>> TypeSerializerFormat and don't enforce IOReadableWritable.
>>>
>>>
>>>
>>> On Tue, Mar 29, 2016 at 2:12 PM, Sourigna Phetsarath <
>>> gna.phetsar...@teamaol.com> wrote:
>>>
 Till,

 Thank you for your reply.

 Having this issue though, WeightVector does not extend IOReadWriteable:

 *public* *class* SerializedOutputFormat<*T* *extends*
 IOReadableWritable>

 *case* *class* WeightVector(weights: Vector, intercept: Double)
 *extends* Serializable {}


 However, I will use the approach to write out the weights as text.


 On Tue, Mar 29, 2016 at 5:01 AM, Till Rohrmann 
 wrote:

> Hi Gna,
>
> there are no utilities yet to do that but you can do it manually. In
> the end, a model is simply a Flink DataSet which you can serialize to
> some file. Upon reading this DataSet you simply have to give it to
> your algorithm to be used as the model. The following code snippet
> illustrates this approach:
>
> mlr.fit(inputDS, parameters)
>
> // write model to disk using the SerializedOutputFormat
> mlr.weightsOption.get.write(new SerializedOutputFormat[WeightVector], 
> "path")
>
> // read the serialized model from disk
> val model = env.readFile(new SerializedInputFormat[WeightVector], "path")
>
> // set the read model for the MLR algorithm
> mlr.weightsOption = model
>
> Cheers,
> Till
> ​
>
> On Tue, Mar 29, 2016 at 10:46 AM, Simone Robutti <
> simone.robu...@radicalbit.io> wrote:
>
>> To my knowledge there is nothing like that. PMML is not supported in
>> any form and there's no custom saving format yet. If you really need a
>> quick and dirty solution, it's not that hard to serialize the model into 
>> a
>> file.
>>
>> 2016-03-28 17:59 GMT+02:00 Sourigna Phetsarath <
>> gna.phetsar...@teamaol.com>:
>>
>>> Flinksters,
>>>
>>> Is there an example of saving a Trained Model, loading a Trained
>>> Model and then scoring one or more feature vectors using Flink ML?
>>>
>>> All of the examples I've seen have shown only sequential fit and
>>> predict.
>>>
>>> Thank you.
>>>
>>> -Gna
>>> --
>>>
>>>
>>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services
>>> // Applied Research Chapter
>>> 770 Broadway, 5th Floor, New York, NY 10003
>>> o: 212.402.4871 // m: 917.373.7363
>>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>>
>>> * *
>>>
>>
>>
>


 --


 *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
 Applied Research Chapter
 770 Broadway, 5th Floor, New York, NY 10003
 o: 212.402.4871 // m: 917.373.7363
 vvmr: 8890237 aim: sphetsarath20 t: @sourigna

 * *

>>>
>>>
>>
>


Re: Flink ML 1.0.0 - Saving and Loading Models to Score a Single Feature Vector

2016-04-08 Thread Behrouz Derakhshan
Is there a reasons the Predictor or Estimator class don't have read and
write methods for saving and retrieving the model? I couldn't find Jira
issues for it. Does it make sense to create one ?

BR,
Behrouz

On Wed, Mar 30, 2016 at 4:40 PM, Till Rohrmann  wrote:

> Yes Suneel is completely wright. If the data does not implement
> IOReadableWritable it is probably easier to use the
> TypeSerializerOutputFormat. What you need here to seralize the data is a
> TypeSerializer. You can obtain it the following way:
>
> val model = mlr.weightsOption.get
>
> val weightVectorTypeInfo = TypeInformation.of(classOf[WeightVector])
> val weightVectorSerializer = weightVectorTypeInfo.createSerializer(new 
> ExecutionConfig())
> val outputFormat = new TypeSerializerOutputFormat[WeightVector]
> outputFormat.setSerializer(weightVectorSerializer)
>
> model.write(outputFormat, "path")
>
> Cheers,
> Till
> ​
>
> On Tue, Mar 29, 2016 at 8:22 PM, Suneel Marthi  wrote:
>
>> U may want to use FlinkMLTools.persist() methods which use
>> TypeSerializerFormat and don't enforce IOReadableWritable.
>>
>>
>>
>> On Tue, Mar 29, 2016 at 2:12 PM, Sourigna Phetsarath <
>> gna.phetsar...@teamaol.com> wrote:
>>
>>> Till,
>>>
>>> Thank you for your reply.
>>>
>>> Having this issue though, WeightVector does not extend IOReadWriteable:
>>>
>>> *public* *class* SerializedOutputFormat<*T* *extends* IOReadableWritable
>>> >
>>>
>>> *case* *class* WeightVector(weights: Vector, intercept: Double)
>>> *extends* Serializable {}
>>>
>>>
>>> However, I will use the approach to write out the weights as text.
>>>
>>>
>>> On Tue, Mar 29, 2016 at 5:01 AM, Till Rohrmann 
>>> wrote:
>>>
 Hi Gna,

 there are no utilities yet to do that but you can do it manually. In
 the end, a model is simply a Flink DataSet which you can serialize to
 some file. Upon reading this DataSet you simply have to give it to
 your algorithm to be used as the model. The following code snippet
 illustrates this approach:

 mlr.fit(inputDS, parameters)

 // write model to disk using the SerializedOutputFormat
 mlr.weightsOption.get.write(new SerializedOutputFormat[WeightVector], 
 "path")

 // read the serialized model from disk
 val model = env.readFile(new SerializedInputFormat[WeightVector], "path")

 // set the read model for the MLR algorithm
 mlr.weightsOption = model

 Cheers,
 Till
 ​

 On Tue, Mar 29, 2016 at 10:46 AM, Simone Robutti <
 simone.robu...@radicalbit.io> wrote:

> To my knowledge there is nothing like that. PMML is not supported in
> any form and there's no custom saving format yet. If you really need a
> quick and dirty solution, it's not that hard to serialize the model into a
> file.
>
> 2016-03-28 17:59 GMT+02:00 Sourigna Phetsarath <
> gna.phetsar...@teamaol.com>:
>
>> Flinksters,
>>
>> Is there an example of saving a Trained Model, loading a Trained
>> Model and then scoring one or more feature vectors using Flink ML?
>>
>> All of the examples I've seen have shown only sequential fit and
>> predict.
>>
>> Thank you.
>>
>> -Gna
>> --
>>
>>
>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services
>> // Applied Research Chapter
>> 770 Broadway, 5th Floor, New York, NY 10003
>> o: 212.402.4871 // m: 917.373.7363
>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>
>> * *
>>
>
>

>>>
>>>
>>> --
>>>
>>>
>>> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
>>> Applied Research Chapter
>>> 770 Broadway, 5th Floor, New York, NY 10003
>>> o: 212.402.4871 // m: 917.373.7363
>>> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>>>
>>> * *
>>>
>>
>>
>


Re: FromIteratorFunction problems

2016-04-08 Thread Andrew Whitaker
Thanks, that example is helpful. It seems like to use `fromCollection` with
an iterator it must be an iterator that implements serializable, and Java's
built in `Iterator`s don't, unfortunately.

On Thu, Apr 7, 2016 at 6:11 PM, Chesnay Schepler  wrote:

> hmm, maybe i was to quick with linking to the JIRA.
>
> As for an example: you can look at the streaming WindowJoin example. The
> sample data uses an Iterator. (ThrottledIterator)
> Note that the iterator implementation used is part of flink and also
> implements serializable.
>
> On 07.04.2016 22:18, Andrew Whitaker wrote:
>
> Hi,
>
> I'm trying to get a simple example of a source backed by an iterator
> working. Here's the code I've got:
>
> ```
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> List list = Arrays.asList(1, 2);
>
> env.fromCollection(list.iterator(), Integer.class).print();
> ```
>
> I get the following exception:
>
> ```
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: Object
> org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91
> not serializable
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)
> at braintree.demo.FromIterator.main(FromIterator.java:14)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
> ... 11 more
> ```
>
> This kind of makes sense. The root issue seems to be that the list's
> iterator is not serializable. In fact, java.util.Iterator doesn't implement
> Serializable.
>
> I can't seem to find any examples of `FromIteratorFunction` being used in
> the flink codebase. Am I using it wrong?
>
> Thanks!
>
> --
> Andrew Whitaker | andrew.whita...@braintreepayments.com
> --
> Note: this information is confidential. It is prohibited to share, post
> online or otherwise publicize without Braintree's prior written consent.
>
>
>


-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Re: Yammer metrics

2016-04-08 Thread Sanne de Roever
O.k., sounds nice; didn't mean to impose: my train of thought was a bit
murky, my apologies for that. Although I had prior experience with JMX, and
none with Yammer, at this moment it is all new again. Cleaning up my
thinking a bit, the following picture, not directly Flink related, comes up.

For production usage a complete export of metrics makes a lot of sense;
assuming that each individual metric is sound. In practice one would start
graphing/monitoring general metrics. In case of an incident, additional
metrics stored in a system could help to explain system behavior, and
provide valuable indicators for tuning or things to come in the future.
This would lead to graphing/monitoring additional metrics.

Cheers.

On Fri, Apr 8, 2016 at 1:15 PM, Chesnay Schepler  wrote:

> note that we still *could *expose the option of using the yammer
> reporters; there isn't any technical limitation as of now that would
> prohibit that.
>
>
> On 08.04.2016 13:05, Chesnay Schepler wrote:
>
> I'm very much aware of how Yammer works.
>
> As the slides you linked show (near the end) is that there are several
> small issues with the general-purpose reporters offered by yammer.
>
> Instead of hacking around those issues i would very much prefer creating
> our own reporters that are, again, *based* on the yammer reporters as the
> concept is sound, but can properly interact with the rest of the system.
>
> Long-term this will create cleaner code, easier debugging and allow us to
> adjust things as required at any time.
>
> On 08.04.2016 12:39, Sanne de Roever wrote:
>
> I forgot to add some extra information, still all tentative.
>
> Earlier (erm, 14 years ago to be honest), I also worked on a JMX
> monitoring system, and it turned out to be a pain to identify all the
> components, write the correct jmx queries and then to plot everything. It
> was hard. This presentation echos this sentiment, and proposes the Yammer
> route:
> http://www.slideshare.net/NaderGan/cassandra-jmxexpresshow-do-we-monitor-cassandra-using-graphite-leveraging-yammer-codahale-library
>
>
>
> On Fri, Apr 8, 2016 at 12:12 PM, Sanne de Roever <
> sanne.de.roe...@gmail.com> wrote:
>
>> Thanks Chesnay.
>>
>> Might I make a tentative case for Yammer? I'm not an expert, but I am
>> currently trying to pull together information on this and was reviewing
>> jmxtrans. This is all tentative, I've just dived in a few days ago. Please
>> find the information below.
>>
>> Using Yammer it is possible to load an MBean that queries all metrics and
>> exports them en masse to a target destination, statsd or graphite for
>> example, but any destination will to. The gist is that the metrics do not
>> have to be queried one by one, and that the export can be arranged by
>> loading an extra jar.
>>
>> An example of this setup can be seen at:
>> 
>> https://github.com/airbnb/kafka-statsd-metrics2
>>
>> Put the MBean jar in the classpath, add a config file, and presto. This
>> would work for Kafka, and Cassandra, and I would not need a separate
>> JMXtrans server.
>>
>>
>>
>>
>> On Fri, Apr 8, 2016 at 11:01 AM, Chesnay Schepler < 
>> ches...@apache.org> wrote:
>>
>>> Flink currently doesn't expose any metrics beyond those shown in the
>>> Dashboard.
>>>
>>> I am currently working on integrating a new metrics system that is
>>> partly *based* on Yammer/Codahale/Dropwizard metrics.
>>>
>>> For a first version it is planned to export metrics only via JMX as this
>>> effectively covers all use-cases with the help of JMXTrans and similar
>>> tools.
>>>
>>> Reporting to specific systems is something we want to add as well though.
>>>
>>> Regards,
>>> Chesnay Schepler
>>>
>>>
>>> On 08.04.2016 09:32, Sanne de Roever wrote:
>>>
>>> Hi,
>>>
>>> I´m looking into setting up monitoring for our (Flink) environment and
>>> realized that both Kafka and Cassandra use the yammer metrics library. This
>>> library enables the direct export of all metrics to Graphite (and possibly
>>> statsd). Does Flink use Yammer metrics?
>>>
>>> Cheers,
>>>
>>> Sanne
>>>
>>>
>>>
>>
>
>
>


Re: Handling large state (incremental snapshot?)

2016-04-08 Thread Hironori Ogibayashi
Thank you for your suggestion,

Regarding throughput, actually, there was a bottleneck at the process
which put logs into Kafka.
When I added more process, the throughput increased.

And, also, HyperLogLog seems a good solution in this case. I will try.

Regards,
Hironori

2016-04-07 17:45 GMT+09:00 Aljoscha Krettek :
> Ah yes, you're right. With the non-keyed stream it doesn't make a big
> difference because it's only one big state value.
>
> The throughput still seems quite low. Have you ever tried looking at the
> "back pressure" tab on the Flink dashboard. For this I would suggest to
> disable chaining, so that every operator is run in an isolated task:
>
> env.disableOperatorChaining();
>
> On Thu, 7 Apr 2016 at 05:11 Hironori Ogibayashi 
> wrote:
>>
>> I tried RocksDB, but the result was almost the same.
>>
>> I used the following code and put 2.6million distinct records into Kafka.
>> After processing all records, the state on the HDFS become about 250MB
>> and time needed for
>> the checkpoint was almost 5sec. Processing throughput was
>> FsStateBackend-> 8000msg/sec, RocksDBSsateBackend -> 9000msg/sec
>>
>> ---
>> env.setStateBackend(new
>> RocksDBStateBackend("hdfs://:8020/apps/flink/checkpoints"));
>>
>> val stream = env
>>   .addSource(new FlinkKafkaConsumer09[String]("kafka.json2", new
>> SimpleStringSchema(), properties))
>>   .map(parseJson(_))
>>   .timeWindowAll(Time.of(10, TimeUnit.DAYS))
>>   .trigger(MyContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>   // count distinct values
>>   .fold(Set[String]()){(r,i) => { r + i}}
>>   .map{x => (System.currentTimeMillis(), x.size)}
>>   .addSink(new ElasticsearchSink(config, transports, new
>> IndexRequestBuilder[Tuple2[Long, Int]]  {
>> override def createIndexRequest(element: Tuple2[Long, Int],
>> ctx: RuntimeContext): IndexRequest = {
>>   val json = new HashMap[String, AnyRef]
>>   json.put("@timestamp", new Timestamp(element._1))
>>   json.put("count", element._2: java.lang.Integer)
>>
>> Requests.indexRequest.index("dummy3").`type`("my-type").source(json)
>> }
>>   }))
>> ---
>>
>> I guess this is because I used non-keyed stream, so I had one state
>> record with a big value (all distinct value).
>> I think copying all 250MB(or more) file to HDFS in every checkpoint
>> will be heavy, so I will try storing the distinct values
>> in the external datastore (e.g. redis).
>> Also, when incremental snapshot get implemented, I want to try.
>>
>> Regards,
>> Hironori
>>
>> 2016-04-05 21:40 GMT+09:00 Hironori Ogibayashi :
>> > Aljoscha,
>> >
>> > Thank you for your quick response.
>> > Yes, I am using FsStateBackend, so I will try RocksDB backend.
>> >
>> > Regards,
>> > Hironori
>> >
>> > 2016-04-05 21:23 GMT+09:00 Aljoscha Krettek :
>> >> Hi,
>> >> I guess you are using the FsStateBackend, is that correct? You could
>> >> try
>> >> using the RocksDB state backend:
>> >>
>> >> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend
>> >>
>> >> With this, throughput will be lower but the overhead per checkpoint
>> >> could be
>> >> lower. Also, with this most of the file copying necessary for the
>> >> checkpoint
>> >> will be done while data processing keeps running (asynchronous
>> >> snapshot).
>> >>
>> >> As to incremental snapshots. I'm afraid this feature is not yet
>> >> implemented
>> >> but we're working on it.
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi 
>> >> wrote:
>> >>>
>> >>> Hello,
>> >>>
>> >>> I am trying to implement windowed distinct count on a stream. In this
>> >>> case, the state
>> >>> have to hold all distinct value in the window, so can be large.
>> >>>
>> >>> In my test, if the state size become about 400MB, checkpointing takes
>> >>> 40sec and spends most of Taskmanager's CPU.
>> >>> Are there any good way to handle this situation?
>> >>>
>> >>> Flink document mentions about incremental snapshot, and I am
>> >>> interested in
>> >>> it,
>> >>> but could not find how to enable it. (not implemented yet?)
>> >>>
>> >>>
>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
>> >>>
>> >>> Regards,
>> >>> Hironori


Re: Yammer metrics

2016-04-08 Thread Chesnay Schepler
note that we still /could /expose the option of using the yammer 
reporters; there isn't any technical limitation as of now that would 
prohibit that.


On 08.04.2016 13:05, Chesnay Schepler wrote:

I'm very much aware of how Yammer works.

As the slides you linked show (near the end) is that there are several 
small issues with the general-purpose reporters offered by yammer.


Instead of hacking around those issues i would very much prefer 
creating our own reporters that are, again, /based/ on the yammer 
reporters as the concept is sound, but can properly interact with the 
rest of the system.


Long-term this will create cleaner code, easier debugging and allow us 
to adjust things as required at any time.


On 08.04.2016 12:39, Sanne de Roever wrote:

I forgot to add some extra information, still all tentative.

Earlier (erm, 14 years ago to be honest), I also worked on a JMX 
monitoring system, and it turned out to be a pain to identify all the 
components, write the correct jmx queries and then to plot 
everything. It was hard. This presentation echos this sentiment, and 
proposes the Yammer route: 
http://www.slideshare.net/NaderGan/cassandra-jmxexpresshow-do-we-monitor-cassandra-using-graphite-leveraging-yammer-codahale-library 





On Fri, Apr 8, 2016 at 12:12 PM, Sanne de Roever 
> wrote:


Thanks Chesnay.

Might I make a tentative case for Yammer? I'm not an expert, but
I am currently trying to pull together information on this and
was reviewing jmxtrans. This is all tentative, I've just dived in
a few days ago. Please find the information below.

Using Yammer it is possible to load an MBean that queries all
metrics and exports them en masse to a target destination, statsd
or graphite for example, but any destination will to. The gist is
that the metrics do not have to be queried one by one, and that
the export can be arranged by loading an extra jar.

An example of this setup can be seen at:
https://github.com/airbnb/kafka-statsd-metrics2

Put the MBean jar in the classpath, add a config file, and
presto. This would work for Kafka, and Cassandra, and I would not
need a separate JMXtrans server.




On Fri, Apr 8, 2016 at 11:01 AM, Chesnay Schepler
 wrote:

Flink currently doesn't expose any metrics beyond those shown
in the Dashboard.

I am currently working on integrating a new metrics system
that is partly /based/ on Yammer/Codahale/Dropwizard metrics.

For a first version it is planned to export metrics only via
JMX as this effectively covers all use-cases with the help of
JMXTrans and similar tools.

Reporting to specific systems is something we want to add as
well though.

Regards,
Chesnay Schepler


On 08.04.2016 09:32, Sanne de Roever wrote:

Hi,

I´m looking into setting up monitoring for our (Flink)
environment and realized that both Kafka and Cassandra use
the yammer metrics library. This library enables the direct
export of all metrics to Graphite (and possibly statsd).
Does Flink use Yammer metrics?

Cheers,

Sanne










Re: Yammer metrics

2016-04-08 Thread Chesnay Schepler

I'm very much aware of how Yammer works.

As the slides you linked show (near the end) is that there are several 
small issues with the general-purpose reporters offered by yammer.


Instead of hacking around those issues i would very much prefer creating 
our own reporters that are, again, /based/ on the yammer reporters as 
the concept is sound, but can properly interact with the rest of the system.


Long-term this will create cleaner code, easier debugging and allow us 
to adjust things as required at any time.


On 08.04.2016 12:39, Sanne de Roever wrote:

I forgot to add some extra information, still all tentative.

Earlier (erm, 14 years ago to be honest), I also worked on a JMX 
monitoring system, and it turned out to be a pain to identify all the 
components, write the correct jmx queries and then to plot everything. 
It was hard. This presentation echos this sentiment, and proposes the 
Yammer route: 
http://www.slideshare.net/NaderGan/cassandra-jmxexpresshow-do-we-monitor-cassandra-using-graphite-leveraging-yammer-codahale-library 





On Fri, Apr 8, 2016 at 12:12 PM, Sanne de Roever 
> wrote:


Thanks Chesnay.

Might I make a tentative case for Yammer? I'm not an expert, but I
am currently trying to pull together information on this and was
reviewing jmxtrans. This is all tentative, I've just dived in a
few days ago. Please find the information below.

Using Yammer it is possible to load an MBean that queries all
metrics and exports them en masse to a target destination, statsd
or graphite for example, but any destination will to. The gist is
that the metrics do not have to be queried one by one, and that
the export can be arranged by loading an extra jar.

An example of this setup can be seen at:
https://github.com/airbnb/kafka-statsd-metrics2

Put the MBean jar in the classpath, add a config file, and presto.
This would work for Kafka, and Cassandra, and I would not need a
separate JMXtrans server.




On Fri, Apr 8, 2016 at 11:01 AM, Chesnay Schepler
> wrote:

Flink currently doesn't expose any metrics beyond those shown
in the Dashboard.

I am currently working on integrating a new metrics system
that is partly /based/ on Yammer/Codahale/Dropwizard metrics.

For a first version it is planned to export metrics only via
JMX as this effectively covers all use-cases with the help of
JMXTrans and similar tools.

Reporting to specific systems is something we want to add as
well though.

Regards,
Chesnay Schepler


On 08.04.2016 09:32, Sanne de Roever wrote:

Hi,

I´m looking into setting up monitoring for our (Flink)
environment and realized that both Kafka and Cassandra use
the yammer metrics library. This library enables the direct
export of all metrics to Graphite (and possibly statsd). Does
Flink use Yammer metrics?

Cheers,

Sanne








Re: Yammer metrics

2016-04-08 Thread Sanne de Roever
I forgot to add some extra information, still all tentative.

Earlier (erm, 14 years ago to be honest), I also worked on a JMX monitoring
system, and it turned out to be a pain to identify all the components,
write the correct jmx queries and then to plot everything. It was hard.
This presentation echos this sentiment, and proposes the Yammer route:
http://www.slideshare.net/NaderGan/cassandra-jmxexpresshow-do-we-monitor-cassandra-using-graphite-leveraging-yammer-codahale-library



On Fri, Apr 8, 2016 at 12:12 PM, Sanne de Roever 
wrote:

> Thanks Chesnay.
>
> Might I make a tentative case for Yammer? I'm not an expert, but I am
> currently trying to pull together information on this and was reviewing
> jmxtrans. This is all tentative, I've just dived in a few days ago. Please
> find the information below.
>
> Using Yammer it is possible to load an MBean that queries all metrics and
> exports them en masse to a target destination, statsd or graphite for
> example, but any destination will to. The gist is that the metrics do not
> have to be queried one by one, and that the export can be arranged by
> loading an extra jar.
>
> An example of this setup can be seen at:
> https://github.com/airbnb/kafka-statsd-metrics2
>
> Put the MBean jar in the classpath, add a config file, and presto. This
> would work for Kafka, and Cassandra, and I would not need a separate
> JMXtrans server.
>
>
>
>
> On Fri, Apr 8, 2016 at 11:01 AM, Chesnay Schepler 
> wrote:
>
>> Flink currently doesn't expose any metrics beyond those shown in the
>> Dashboard.
>>
>> I am currently working on integrating a new metrics system that is partly
>> *based* on Yammer/Codahale/Dropwizard metrics.
>>
>> For a first version it is planned to export metrics only via JMX as this
>> effectively covers all use-cases with the help of JMXTrans and similar
>> tools.
>>
>> Reporting to specific systems is something we want to add as well though.
>>
>> Regards,
>> Chesnay Schepler
>>
>>
>> On 08.04.2016 09:32, Sanne de Roever wrote:
>>
>> Hi,
>>
>> I´m looking into setting up monitoring for our (Flink) environment and
>> realized that both Kafka and Cassandra use the yammer metrics library. This
>> library enables the direct export of all metrics to Graphite (and possibly
>> statsd). Does Flink use Yammer metrics?
>>
>> Cheers,
>>
>> Sanne
>>
>>
>>
>


Re: Yammer metrics

2016-04-08 Thread Sanne de Roever
Thanks Chesnay.

Might I make a tentative case for Yammer? I'm not an expert, but I am
currently trying to pull together information on this and was reviewing
jmxtrans. This is all tentative, I've just dived in a few days ago. Please
find the information below.

Using Yammer it is possible to load an MBean that queries all metrics and
exports them en masse to a target destination, statsd or graphite for
example, but any destination will to. The gist is that the metrics do not
have to be queried one by one, and that the export can be arranged by
loading an extra jar.

An example of this setup can be seen at:
https://github.com/airbnb/kafka-statsd-metrics2

Put the MBean jar in the classpath, add a config file, and presto. This
would work for Kafka, and Cassandra, and I would not need a separate
JMXtrans server.




On Fri, Apr 8, 2016 at 11:01 AM, Chesnay Schepler 
wrote:

> Flink currently doesn't expose any metrics beyond those shown in the
> Dashboard.
>
> I am currently working on integrating a new metrics system that is partly
> *based* on Yammer/Codahale/Dropwizard metrics.
>
> For a first version it is planned to export metrics only via JMX as this
> effectively covers all use-cases with the help of JMXTrans and similar
> tools.
>
> Reporting to specific systems is something we want to add as well though.
>
> Regards,
> Chesnay Schepler
>
>
> On 08.04.2016 09:32, Sanne de Roever wrote:
>
> Hi,
>
> I´m looking into setting up monitoring for our (Flink) environment and
> realized that both Kafka and Cassandra use the yammer metrics library. This
> library enables the direct export of all metrics to Graphite (and possibly
> statsd). Does Flink use Yammer metrics?
>
> Cheers,
>
> Sanne
>
>
>


Re: Joda DateTimeSerializer

2016-04-08 Thread Robert Metzger
Hi Stefano,

your fix is the right way to resolve the issue ;)

If you want, give me your Confluence Wiki username and I give you edit
permissions in our wiki. Otherwise, I'll quickly add a note to the
migration guide.

On Fri, Apr 8, 2016 at 11:28 AM, Stefano Bortoli 
wrote:

> Hi to all,
> we've just upgraded to Flink 1.0.0 and we had some problems with joda
> DateTime serialization.
> The problem was caused by Flink-3305 that removed the JavaKaffee
> dependency.
> We had to re-add such dependency in our application and then register the
> DateTime serializer in the environment:
>
> env.registerTypeWithKryoSerializer(DateTime.class,
> JodaDateTimeSerializer.class );
>
> and in the pom.xml of course (checking compatibility with Flink's Kryo
> version that is 2.24.0):
>
>
> de.javakaffee
> kryo-serializers
> 0.28
> 
>
> We didn't see a mention to this problem in the migration guide, I think it
> should be added.
>
> Best,
> Stefano
>


Re: Integrate Flink with S3 on EMR cluster

2016-04-08 Thread Robert Metzger
Hi Timur,

the Flink optimizer runs on the client, so the exception is thrown from the
JVM running the ./bin/flink client.
Since the statistics sampling is an optional step, its surrounded by a try
/ catch block that just logs the error message.

More answers inline below


On Thu, Apr 7, 2016 at 11:32 PM, Timur Fayruzov 
wrote:

> The exception does not show up in the console when I run the job, it only
> shows in the logs. I thought it means that it happens either on AM or TM (I
> assume what I see in stdout is client log). Is my thinking right?
>
>
> On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi  wrote:
>
>> Hey Timur,
>>
>> Just had a chat with Robert about this. I agree that the error message
>> is confusing, but it is fine it this case. The file system classes are
>> not on the class path of the client process, which is submitting the
>> job.
>
> Do you mean that classes should be in the classpath of
> `org.apache.flink.client.CliFrontend` class that `bin/flink` executes? I
> tried to add EMRFS jars to this classpath but it did not help. BTW, it
> seems that bin/flink script assumes that HADOOP_CLASSPATH is set, which is
> not the case in my EMR setup. The HADOOP_CLASSPATH seem to be the only
> point that I control here to add to classpath, so I had to set it manually.
>

Yes, they have to be in the classpath of the CliFrontend.
The client should also work without the HADOOP_CLASSPATH being set. Its
optional for cases where you want to manually add jars to the classpath.
For example on Google Compute they set the HADOOP_CLASSPATH.

Please note that we are not transferring the contents of the
HADOOP_CLASSPATH to the other workers on the cluster. So you have to set
the HADOOP_CLASSPATH on all machines.
Another approach is just putting the required jar into the "lib/" folder of
your Flink installation (the folder is next to "bin/", "conf/", "logs/").


>
>
>> It fails to sample the input file sizes, but this is just an
>> optimization step and hence it does not fail the job and only logs the
>> error.
>>
> Is this optimization only for client side? In other words, does it affect
> Flink's ability to choose proper type of a join?
>

Your DataSet program is translated into a generic representation. Then,
this representation is passed into the optimizer, which decides on join /
sorting / data shipping strategies. The output of the optimizer is sent to
the JobManager for execution.
If the optimizer is not able to get good statistics about the input (like
in your case), it will default to robust execution strategies. I don't know
the input sizes of your job and the structure of your job, but chances are
high that the final plan is the same with and without the input statistics.
Only in cases where one join side is very small the input statistics might
be relevant.
Other optimizations, such as reusing existing data partitioning or ordering
work independent of the input sampling.


>
>
>>
>> After the job is submitted everything should run as expected.
>>
>> You should be able to get rid of that exception by adding the missing
>> classes to the class path of the client process (started via
>> bin/flink), for example via the lib folder.
>>
> The above approach did not work, could you elaborate what you meant by
> 'lib folder'?
>

See above.



>
> Thanks,
> Timur
>
>
>> – Ufuk
>>
>>
>>
>>
>> On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov 
>> wrote:
>> > There's one more filesystem integration failure that I have found. My
>> job on
>> > a toy dataset succeeds, but Flink log contains the following message:
>> > 2016-04-07 18:10:01,339 ERROR
>> > org.apache.flink.api.common.io.DelimitedInputFormat   -
>> Unexpected
>> > problen while getting the file statistics for file 's3://...':
>> > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>> > java.lang.RuntimeException: java.lang.RuntimeException:
>> > java.lang.ClassNotFoundException: Class
>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>> > at
>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>> > at
>> >
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
>> > at
>> >
>> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
>> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
>> > at
>> >
>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
>> > at
>> >
>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
>> > at
>> >
>> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
>> > at
>> >
>> 

Joda DateTimeSerializer

2016-04-08 Thread Stefano Bortoli
Hi to all,
we've just upgraded to Flink 1.0.0 and we had some problems with joda
DateTime serialization.
The problem was caused by Flink-3305 that removed the JavaKaffee dependency.
We had to re-add such dependency in our application and then register the
DateTime serializer in the environment:

env.registerTypeWithKryoSerializer(DateTime.class,
JodaDateTimeSerializer.class );

and in the pom.xml of course (checking compatibility with Flink's Kryo
version that is 2.24.0):

   
de.javakaffee
kryo-serializers
0.28


We didn't see a mention to this problem in the migration guide, I think it
should be added.

Best,
Stefano


Re: Yammer metrics

2016-04-08 Thread Chesnay Schepler
Flink currently doesn't expose any metrics beyond those shown in the 
Dashboard.


I am currently working on integrating a new metrics system that is 
partly /based/ on Yammer/Codahale/Dropwizard metrics.


For a first version it is planned to export metrics only via JMX as this 
effectively covers all use-cases with the help of JMXTrans and similar 
tools.


Reporting to specific systems is something we want to add as well though.

Regards,
Chesnay Schepler

On 08.04.2016 09:32, Sanne de Roever wrote:

Hi,

I´m looking into setting up monitoring for our (Flink) environment and 
realized that both Kafka and Cassandra use the yammer metrics library. 
This library enables the direct export of all metrics to Graphite (and 
possibly statsd). Does Flink use Yammer metrics?


Cheers,

Sanne




Yammer metrics

2016-04-08 Thread Sanne de Roever
Hi,

I´m looking into setting up monitoring for our (Flink) environment and
realized that both Kafka and Cassandra use the yammer metrics library. This
library enables the direct export of all metrics to Graphite (and possibly
statsd). Does Flink use Yammer metrics?

Cheers,

Sanne