Re: Continuing from the stackoverflow post

2015-11-27 Thread Nirmalya Sengupta
Hello Fabian,


A little long mail; please have some patience.

>From your response: ' Let's start by telling me what you actually want to
do ;-) '

At a broad level, I want to write a (series of, perhaps) tutorial of Flink,
where these concepts are brought out by a mix of definition, elaboration,
illustration and of course, code snippets. If that helps the community, I
will be very happy. In the least, I will understand the principles and
their application, much better. So, I am a bit selfish here perhaps. You
also mention that you are preparing some such  material. If I can
complement your effort, I will be delighted.

One never knows: going further, I may become a trainer / evangelist of
Apache Flink, if I show enough grasp of the subject! :-)

Now to this particular question (from SOF):

When I began, my intention was to find maximum temperature, *every 5
successive records* (to answer your question).

As I have said before, I am learning and hence, trying with various
operator combinations on the same set of data to see what happens and then,
trying to explain why that happens.

Let's refer to the code again:

val readings =
  readIncomingReadings(env,"./sampleIOTTiny.csv")
  .map(e => (e.sensorUUID,e.ambientTemperature))
  .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
  .trigger(CountTrigger.of(5))
  .evictor(CountEvictor.of(4))
  .maxBy(1)


So, what I understand is this:

timeWindowAll defines a pane of 5 msecs. When this time expires, the
timeWindowAll fires a kind of *onExpirationOf* trigger (I have fabricated
the name, quite likely it doesn't exist). This perhaps does nothing other
than passing to the function (here, *maxBy() *) the contents of the window
(whatever number of elements have been collected in last 5 msecs) and
clearing the pane, readying it for the next 5 msecs (not exactly, but more
of it later).

However, I provide a CountTrigger (5 elements). According to the rules of
Flink, this trigger replaces the aforementioned default onExpirationOf
trigger. Therefore, when timeWindowAll is ready after 5 msecs have passed,
what it finds available to fire is this CountTrigger. However, a
CountTrigger can fire only if its count-related (here, 5) criterion is
satisfied. So, after 5 msecs have passed, if the number of elements
collected in timeWindowAll pane is >= 5, *only* then CountTrigger will
fire; otherwise, CountTrigger will not stir and timeWindowAll will shrug
its shoulders and go back to wait for the next 5 msecs period.

Going further, I provide a CountEvictor. According to the rules of  Flink,
an Evictor is allowed to act only when its associated trigger (here,
CountTrigger) is fired. Because of its presence, a further check is made on
the contents of the pane. If CountTigger is fired, the number of elements
collected in the pane must be >= 5. However, the evictor is interested only
in first 4 of them. The evictor *takes away* these 4 from timeWindowAll's
pane and gives them to the function. The 5th element still remains in the
pane. timeWindowAll readies itself for next 5 msecs, but its pane is not
empty this time. It still has that solitary element there.

This much seems straightforward but there is a twist in the tale.

A very important point about timeWindowAll's pane is its ephemeral nature.
When I specify timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS)), Flink
understands this as an instruction to create a pane every successive 5
msecs period. Flink doesn't create one pane which is cleared after every 5
msecs (this is what I inexactly mentioned earlier), and readied for the
next 5 msecs. Instead, it brings into existence a *freshly minted pane,*
every 5 msecs. The contents of the preceding pane are subjected to trigger
(if it exists) and evictor (if it exists) and finally, function (if it is
provided). Then, Flink *dumps the preceding pane* along with its contents,
if any and readies the new pane, awaiting elements during the next 5 msecs.

In my case above, the criteria of timeWindowAll and Trigger/Evictor are not
locked in step as precisely as they should have been. It is quite possible
that while CountTrigger fires because 5 elements are already in the pane, 5
msecs are *yet to lapse*. So, the current pane of timeWindowAll is still
alive and is collecting subsequent elements arriving. The evictor takes
away 4 elements. The remaining element is joined by a few more *before* 5
msecs lapse. After 5 msecs have lapsed, Flink extirpates the pane - along
with its current contents - and creates a fresh pane. In effect, some
elements which arrive and are collected in the pane *never reach* the
trigger/evictor pair. These unfortunate elements are destroyed along with
the pane in which they reside. Obviously, this affects the output that I
see. The calculation of maximum temperature is inaccurate simply because
some of the temperature readings are never available to the _maxBy_
function.

Have I got it almost correct? Will be keen to hear from you.

-- Nirmal

Re: Interpretation of Trigger and Eviction on a window

2015-11-27 Thread Nirmalya Sengupta
Hello Fabian,

>From your reply to this thread:
' it is correct that the evictor is called BEFORE the window function is
applied because this is required to support certain types of sliding
windows. '

This is clear to me now. However, my point was about the way it is
described in the User-guide. The guide says this:
' *After the trigger fires, and before the function (e.g., sum, count) is
applied to the window contents, an optional Evictor removes some elements
from the beginning of the window before the remaining elements are passed
on to the function* '

As I read it again, I see where the problem lies. It says some elements are
removed before the **rest** are passed to the function. This is not what
happens, I think. Evictor removes elements and the function *sees this set
of removed elements, not the remaining elements*. Remaining elements remain
in the window and are perhaps picked up by the Evictor next time.

Carrying on from your elaboration, I think guide's statement can be better
rearranged as:

' *After the trigger fires, the function (e.g., sum, count) is applied to
the entire contents of the window. However, an optionally
provided Evictor, removes some elements from the beginning of the window,
according to the criteria of eviction. The function is then applied to this
set of __removed__ elements. '*

Let me know if I am way off the mark here.

-- Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


Get an aggregator's value outside of an iteration

2015-11-27 Thread Truong Duc Kien

Hi,

I'm looking for a way get the value of aggregators outside of iteration. 
Specifically, I want the final aggregators' value after the iteration 
has finished. Is there any API for that ?


Thanks,
Kien Truong


Re: Insufficient number of network buffers running on cluster

2015-11-27 Thread Fabian Hueske
Hi Guido,

please check this section of the configuration documentation:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-the-network-buffers

It should answer your questions. Please let us know, if not.

Cheers, Fabian

2015-11-27 16:41 GMT+01:00 Guido :

> Hello,
>
> I would like to ask you something regarding an error I’m facing running
> java code over the cluster at DIMA.
>
> Caused by: java.io.IOException: Insufficient number of network buffers:
> required 300, but only 161 available. The total number of network buffers
> is currently set to 70480. You can increase this number by setting the
> configuration key 'taskmanager.network.numberOfBuffers'.
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:176)
> at
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:313)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
> at java.lang.Thread.run(Thread.java:853)
>
> The error seems pretty clear so as suggested, I increased  the
> taskmanager.network.numberOfBuffers from the flink-conf.yaml, (first was
> setted to 35240), but actually this didn’t solve the problem, and the
> number of the available buffers is almost always the same.
> Could you explain about some further configurations I should check above
> the cluster?
> Besides solve this issue I would be interested in understanding why I get
> this and where these values come from.
>
> Thanks in advance for your time,
>
> Guido
>


Insufficient number of network buffers running on cluster

2015-11-27 Thread Guido
Hello,

I would like to ask you something regarding an error I’m facing running java 
code over the cluster at DIMA.

Caused by: java.io.IOException: Insufficient number of network buffers: 
required 300, but only 161 available. The total number of network buffers is 
currently set to 70480. You can increase this number by setting the 
configuration key 'taskmanager.network.numberOfBuffers'.
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:176)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:313)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:464)
at java.lang.Thread.run(Thread.java:853)

The error seems pretty clear so as suggested, I increased  the 
taskmanager.network.numberOfBuffers from the flink-conf.yaml, (first was setted 
to 35240), but actually this didn’t solve the problem, and the number of the 
available buffers is almost always the same.
Could you explain about some further configurations I should check above the 
cluster?
Besides solve this issue I would be interested in understanding why I get this 
and where these values come from.

Thanks in advance for your time,

Guido

Re: Working with State example /flink streaming

2015-11-27 Thread Lopez, Javier
Hi,

Thanks for the example. We have done it with windows before and it works.
We are using state because the data comes with a gap of several days and we
can't handle a window size of several days. That's why we decided to use
the state.

On 27 November 2015 at 11:09, Aljoscha Krettek  wrote:

> Hi,
> I’ll try to go into a bit more detail about the windows here. What you can
> do is this:
>
> DataStream> input = … // fields are (id, sum,
> count), where count is initialized to 1, similar to word count
>
> DataStream> counts = input
>   .keyBy(0)
>   .timeWindow(Time.minutes(10))
>   .reduce(new MyCountingReducer())
>
> DataStream> result = counts.map(  divides sum by count> )
>
> Does this help? Here, you don’t even have to deal with state, the
> windowing system will keep the state (i.e. the reduced) value in internal
> state in a fault tolerant fashion.
>
> Cheers,
> Aljoscha
> > On 26 Nov 2015, at 17:14, Stephan Ewen  wrote:
> >
> > Hi!
> >
> > In streaming, there is no "end" of the stream when you would emit the
> final sum. That's why there are windows.
> >
> > If you do not want the partial sums, but only the final sum, you need to
> define what window in which the sum is computed. At the end of that window,
> that value is emitted. The window can be based on time, counts, or other
> measures.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Thu, Nov 26, 2015 at 4:07 PM, Lopez, Javier 
> wrote:
> > Hi, thanks for the answer. It worked but not in the way we expected. We
> expect to have only one sum per ID and we are getting all the consecutive
> sums, for example:
> >
> > We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the initial
> values are ID -> 11, values -> 1,2,3). Here is the code we are using for
> our test:
> >
> > DataStream > uple2> stream = ...;
> >
> >
> > DataStream> result =
> stream.keyBy(0).map(new RollingSum());
> >
> >
> >
> > public static class RollingSum extends RichMapFunction Double>, Tuple4> {
> >
> > // persistent counter
> >   private OperatorState sum;
> >   private OperatorState count;
> >
> >
> > @Override
> > public Tuple4 map(Tuple2 Double> value1) {
> >   try {
> >   Double newSum = sum.value()+value1.f1;
> >
> >   sum.update(newSum);
> >   count.update(count.value()+1);
> >   return new Tuple4 Double>(value1.f0,sum.value(),count.value(),sum.value()/count.value());
> >   } catch (IOException e) {
> >   // TODO Auto-generated catch block
> >   e.printStackTrace();
> >   }
> >
> >   return null;
> >
> > }
> >
> > @Override
> > public void open(Configuration config) {
> > sum = getRuntimeContext().getKeyValueState("mySum",
> Double.class, 0D);
> > count = getRuntimeContext().getKeyValueState("myCounter",
> Long.class, 0L);
> > }
> >
> > }
> >
> >
> > We are using a Tuple4 because we want to calculate the sum and the
> average (So our Tuple is ID, SUM, Count, AVG). Do we need to add another
> step to get a single value out of it? or is this the expected behavior.
> >
> > Thanks again for your help.
> >
> > On 25 November 2015 at 17:19, Stephan Ewen  wrote:
> > Hi Javier!
> >
> > You can solve this both using windows, or using manual state.
> >
> > What is better depends a bit on when you want to have the result (the
> sum). Do you want a result emitted after each update (or do some other
> operation with that value) or do you want only the final sum after a
> certain time?
> >
> > For the second variant, I would use a window, for the first variant, you
> could use custom state as follows:
> >
> > For each element, you take the current state for the key, add the value
> to get the new sum. Then you update the state with the new sum and emit the
> value as well...
> >
> > Java:
> >
> > DataStream > uple2> stream = ...;
> >
> >
> > DataStream> result = stream.keyBy(0).map(new
> RollingSum());
> >
> >
> > public
> >  class RollingSum extends RichMapFunction,
> Tuple2> {
> >
> >
> >
> > private OperatorState sum;
> >
> >
> >
> > @Override
> >
> >
> > public Tuple2 map(Tuple2 value) {
> > long
> > newSum = sum.value() + value.f1;
> >
> > sum.update(newSum);
> >
> >
> > return new Tuple2<>(value.f0, newSum);
> >
> >
> > }
> >
> >
> >
> > @Override
> >
> >
> > public void open(Configuration config) {
> >
> >
> > counter = getRuntimeContext().getKeyValueState("myCounter", Long.class,
> 0L);
> >
> >
> > }
> > }
> >
> >
> > In Scala, you can write this briefly as:
> >
> > val stream: DataStream[(String, Int)] = ...
> >
> >
> >
> > val counts: DataStream[(String, Int)] = stream
> >
> >
> > .keyBy(_._1)
> >
> >
> > .mapWithState((in: (String, Int), sum: Option[Int])
> > => {
> >
> > val newSum = in._2 + sum.getOrElse(0)
> >
> > ( (
> > in._

Re: POJO Dataset read and write

2015-11-27 Thread Flavio Pompermaier
I was expecting Parquet + thrift to perform faster but I wasn't expecting
that much, it was just to know whether my results were right or not. Thanks
for the moment Fabian!

On Fri, Nov 27, 2015 at 4:22 PM, Fabian Hueske  wrote:

> Parquet is much cleverer that the TypeSerializer and applies columnar
> storage and compression technique.
> The TypeSerializerIOFs just use Flink's element-wise serializers to write
> and read binary data.
>
> I'd go with Parquet if it is working well for you.
>
> 2015-11-27 16:15 GMT+01:00 Flavio Pompermaier :
>
>> I made a simple test and using parquet + thrift vs TypeSerializer IF/OF:
>> the former outperformed the second approach for a simple filter (not pushed
>> down) and a map+sum (something like 2 s vs 33s, and not considering disk
>> space usage that is much worse). Is that normal or TypeSerializer is
>> supposed to perform better then this?
>>
>>
>> On Fri, Nov 27, 2015 at 3:39 PM, Fabian Hueske  wrote:
>>
>>> If you are just looking for an exchange format between two Flink jobs, I
>>> would go for the TypeSerializerInput/OutputFormat.
>>> Note that these are binary formats.
>>>
>>> Best, Fabian
>>>
>>> 2015-11-27 15:28 GMT+01:00 Flavio Pompermaier :
>>>
 Hi to all,

 I have a complex POJO (with nexted objects) that I'd like to write and
 read with Flink (batch).
 What is the simplest way to do that? I can't find any example of it :(

 Best,
 Flavio

>>>
>>>
>>
>


Re: POJO Dataset read and write

2015-11-27 Thread Fabian Hueske
Parquet is much cleverer that the TypeSerializer and applies columnar
storage and compression technique.
The TypeSerializerIOFs just use Flink's element-wise serializers to write
and read binary data.

I'd go with Parquet if it is working well for you.

2015-11-27 16:15 GMT+01:00 Flavio Pompermaier :

> I made a simple test and using parquet + thrift vs TypeSerializer IF/OF:
> the former outperformed the second approach for a simple filter (not pushed
> down) and a map+sum (something like 2 s vs 33s, and not considering disk
> space usage that is much worse). Is that normal or TypeSerializer is
> supposed to perform better then this?
>
>
> On Fri, Nov 27, 2015 at 3:39 PM, Fabian Hueske  wrote:
>
>> If you are just looking for an exchange format between two Flink jobs, I
>> would go for the TypeSerializerInput/OutputFormat.
>> Note that these are binary formats.
>>
>> Best, Fabian
>>
>> 2015-11-27 15:28 GMT+01:00 Flavio Pompermaier :
>>
>>> Hi to all,
>>>
>>> I have a complex POJO (with nexted objects) that I'd like to write and
>>> read with Flink (batch).
>>> What is the simplest way to do that? I can't find any example of it :(
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>


Re: POJO Dataset read and write

2015-11-27 Thread Flavio Pompermaier
I made a simple test and using parquet + thrift vs TypeSerializer IF/OF:
the former outperformed the second approach for a simple filter (not pushed
down) and a map+sum (something like 2 s vs 33s, and not considering disk
space usage that is much worse). Is that normal or TypeSerializer is
supposed to perform better then this?

On Fri, Nov 27, 2015 at 3:39 PM, Fabian Hueske  wrote:

> If you are just looking for an exchange format between two Flink jobs, I
> would go for the TypeSerializerInput/OutputFormat.
> Note that these are binary formats.
>
> Best, Fabian
>
> 2015-11-27 15:28 GMT+01:00 Flavio Pompermaier :
>
>> Hi to all,
>>
>> I have a complex POJO (with nexted objects) that I'd like to write and
>> read with Flink (batch).
>> What is the simplest way to do that? I can't find any example of it :(
>>
>> Best,
>> Flavio
>>
>
>


Re: Working with the Windowing functionality

2015-11-27 Thread Aljoscha Krettek
Hi,
yes, you are right in your analysis. Did you try running it with always setting 
the timer? Maybe it’s not the bottleneck of the computation. I would be very 
interested in seeing how this behaves since I only did tests with regular time 
windows, where the first if statement almost always directly returns, which is 
very cheap.

Cheers,
Aljoscha
> On 27 Nov 2015, at 13:59, Niels Basjes  wrote:
> 
> Hi,
> 
> Thanks for all this input.
> I didn't know about the 
>   // a trigger can only have 1 timer so we remove the old trigger when 
> setting the new one
> 
> This insight is to me of major importance.
> Let me explain: 
> I found in the WindowOperator this code below.
> 
> @Override
> public void registerEventTimeTimer(long time) {
>if (watermarkTimer == time) {
>   // we already have set a trigger for that time
>   return;
>}
>Set triggers = watermarkTimers.get(time);
>if (triggers == null) {
>   triggers = new HashSet<>();
>   watermarkTimers.put(time, triggers);
>}
>this.watermarkTimer = time;
>triggers.add(this);
> }
> 
> and
> 
> if (time == watermarkTimer) {
>watermarkTimer = -1;
>Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time, 
> window, this);
> 
> Effectively the new value is stored; processed yet at the moment the trigger 
> fires the call is not forwarded into the application. 
> So if I would do it as you show in your example I would have the same number 
> of trigger entries in the watermarkTimers set as I have seen events.
> My application will (in total) handle about 50K events/sec resulting in to 
> thousands 'onEventTime' calls per second.
> 
> So thank you. I now understand I have to be more careful with these timers!.
> 
> Niels Basjes
> 
> 
> 
> On Fri, Nov 27, 2015 at 11:28 AM, Aljoscha Krettek  
> wrote:
> Hi Niels,
> do the records that arrive from Kafka already have the session ID or do you 
> want to assign them inside your Flink job based on the idle timeout?
> 
> For the rest of your problems you should be able to get by with what Flink 
> provides:
> 
> The triggering can be done using a custom Trigger that fires after we haven’t 
> seen an element for 30 minutes.
> public class TimeoutTrigger implements Trigger {
>private static final long serialVersionUID = 1L;
> 
>@Override
>public TriggerResult onElement(Object element, long timestamp, Window 
> window, TriggerContext ctx) throws Exception {
>   // on every element it will set a timer for 30 seconds in the future
>   // a trigger can only have 1 timer so we remove the old trigger when 
> setting the new one
>   ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 3); // 
> this is 30 seconds but you can change it
>   return TriggerResult.CONTINUE;
>}
> 
>@Override
>public TriggerResult onEventTime(long time, Window window, TriggerContext 
> ctx) {
>   return TriggerResult.CONTINUE;
>}
> 
>@Override
>public TriggerResult onProcessingTime(long time, Window window, 
> TriggerContext ctx) throws Exception {
>   return TriggerResult.FIRE_AND_PURGE;
>}
> 
>@Override
>public String toString() {
>   return "TimeoutTrigger()";
>}
> }
> 
> you would use it like this:
> stream.keyBy(…).window(…).trigger(new TimeoutTrigger())
> 
> For writing to files you could use the RollingSink 
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#hadoop-filesystem).
>  I think this does pretty much what you want. You can specify how large the 
> files that it writes are, and it can also roll to new files on a specified 
> time interval.
> 
> Please let us know if you need more information.
> 
> Cheers,
> Aljoscha
> > On 26 Nov 2015, at 22:13, Niels Basjes  wrote:
> >
> > Hi,
> >
> > I'm trying to build something in Flink that relies heavily on the Windowing 
> > features.
> >
> > In essence what I want to build:
> > I have clickstream data coming in via Kafka. Each record (click) has a 
> > sessionid and a timestamp.
> > I want to create a window for each session and after 30 minutes idle I want 
> > all events for that session (visit) to be written to disk.
> > This should result in the effect that a specific visit exists in exactly 
> > one file.
> > Since HDFS does not like 'small files' I want to create a (set of) files 
> > every 15 minutes that contains several complete  visits.
> > So I need to buffer the 'completed visits' and flush them to disk in 15 
> > minute batches.
> >
> > What I think I need to get this is:
> > 1) A map function that assigns the visit-id (i.e. new id after 30 minutes 
> > idle)
> > 2) A window per visit-id (close the window 30 minutes after the last click)
> > 3) A window per 15 minutes that only contains windows of visits that are 
> > complete
> >
> > Today I've been trying to get this setup and I think I have some parts that 
> > are in the right direction.
> >
> > I have some questions and I'm hoping you guys ca

Re: Doubt about window and count trigger

2015-11-27 Thread Stephan Ewen
Hi!

The reason why trigger state is purged right now with the window is to make
sure that no memory is occupied any more after the purge.
Otherwise, memory consumption would just grow indefinitely, holding state
of old triggers.

Greetings,
Stephan


On Fri, Nov 27, 2015 at 4:05 PM, Fabian Hueske  wrote:

> When a window is purged, the Trigger and its state are also cleared.
> A new window comes with a new Trigger (and a new state).
> So yes, in your example the window will be fired after 30 secs again.
>
> Best, Fabian
>
> 2015-11-27 16:01 GMT+01:00 Anwar Rizal :
>
>> Thanks Fabian,
>>
>> Just for completion.
>> In that 1 min window, is my modified count trigger still valid ? Say, if
>> in that one minute window, I have 100 events after 30 s, it will still fire
>> at 30th second  ?
>>
>> Cheers,
>> anwar.
>>
>>
>>
>> On Fri, Nov 27, 2015 at 3:31 PM, Fabian Hueske  wrote:
>>
>>> Hi Anwar,
>>>
>>> You trigger looks good!
>>>
>>> I just want to make sure you know what it is exactly happening after a
>>> window was evaluated and purged.
>>> Once a window was purged, the whole window is cleared and removed. If a
>>> new element arrives, that would have fit into the purged window, a new
>>> window with exactly the same time boundaries is created, i.e., if you have
>>> a 5 min time window, that is fired and purged in minute 4 and a new element
>>> arrived immediately after the purging, it is put into a window, that will
>>> only "exist" for 1 more minute (and not starting a new 5 minute window).
>>>
>>> Cheers, Fabian
>>>
>>>
>>> 2015-11-27 14:59 GMT+01:00 Anwar Rizal :
>>>
 Thanks Fabian and Aljoscha,

 I try to implement the trigger as you described as follow:

 https://gist.github.com/anonymous/d0578a4d27768a75bea4
 

 It works fine , indeed.

 Thanks,
 Anwar


 On Fri, Nov 27, 2015 at 11:49 AM, Aljoscha Krettek >>> > wrote:

> Hi Anwar,
> what Fabian wrote is completely right. I just want to give the
> reasoning for why the CountTrigger behaves as it does. The idea was to 
> have
> Triggers that clearly focus on one thing and then at some point add
> combination triggers. For example, an OrTrigger that triggers if either of
> it’s sub triggers triggers, or an AndTrigger that triggers after both its
> sub triggers fire. (There is also more complex stuff that could be thought
> of here.)
>
> Cheers,
> Aljoscha
> > On 27 Nov 2015, at 09:59, fhue...@gmail.com wrote:
> >
> >
> > Hi,
> >
> > a regular tumbling time window of 5 seconds gets all elements within
> that period of time (semantics of time varies for processing, ingestion,
> and event time modes) and triggers the execution after 5 seconds.
> >
> > If you define a custom trigger, the assignment policy remains the
> same, but the trigger condition is overwritten (it is NOT additional but
> replaces the default condition), i.e., in your implementation, it will 
> only
> trigger when 100 elements arrived. In order to trigger also when the 
> window
> time expires, you have to register a timer (processing time or event time
> timer) via the trigger context.
> > NOTE: The window assigner will continue to assign elements to the
> window, even if the window was already evaluated. If you PURGE the window
> and an element arrives after that, a new window is created.
> >
> > To implement your trigger, you have to register a timer in the
> onEvent() method with:
> > ctx.registerEventTimeTimer(window.getEnd)
> > You can to that in every onEvent() call, because the timer is always
> overwritten.
> >
> > NOTE: you should use Flink’s keyed-state (access via triggerContext)
> if you want to keep state such as the current count.
> >
> > Hope this helps. Please let me know if you have further questions.
> > Fabian
> >
> >
> >
> >
> > From: Matthias J. Sax
> > Sent: Friday, November 27, 2015 08:44
> > To: user@flink.apache.org
> > Subject: Re: Doubt about window and count trigger
> >
> >
> > Hi,
> >
> > a Trigger is an *additional* condition for intermediate (early)
> > evaluation of the window. Thus, it is not "or-ed" to the basic window
> > definition.
> >
> > If you want to have an or-ed window condition, you can customize it
> by
> > specifying your own window definition.
> >
> > > dataStream.window(new MyOwnWindow() extends WindowAssigner { /*
> put your code here */ );
> >
> > -Matthias
> >
> >
> > On 11/26/2015 11:40 PM, Anwar Rizal wrote:
> > > Hi all,
> > >
> > > From the documentation:
> > > "The |Trigger| specifies when the function that comes after the
> window
> > > clause (e.g., |sum|, |count|) is evaluated (“fires”) for each
> window."
>>

Re: Doubt about window and count trigger

2015-11-27 Thread Fabian Hueske
When a window is purged, the Trigger and its state are also cleared.
A new window comes with a new Trigger (and a new state).
So yes, in your example the window will be fired after 30 secs again.

Best, Fabian

2015-11-27 16:01 GMT+01:00 Anwar Rizal :

> Thanks Fabian,
>
> Just for completion.
> In that 1 min window, is my modified count trigger still valid ? Say, if
> in that one minute window, I have 100 events after 30 s, it will still fire
> at 30th second  ?
>
> Cheers,
> anwar.
>
>
>
> On Fri, Nov 27, 2015 at 3:31 PM, Fabian Hueske  wrote:
>
>> Hi Anwar,
>>
>> You trigger looks good!
>>
>> I just want to make sure you know what it is exactly happening after a
>> window was evaluated and purged.
>> Once a window was purged, the whole window is cleared and removed. If a
>> new element arrives, that would have fit into the purged window, a new
>> window with exactly the same time boundaries is created, i.e., if you have
>> a 5 min time window, that is fired and purged in minute 4 and a new element
>> arrived immediately after the purging, it is put into a window, that will
>> only "exist" for 1 more minute (and not starting a new 5 minute window).
>>
>> Cheers, Fabian
>>
>>
>> 2015-11-27 14:59 GMT+01:00 Anwar Rizal :
>>
>>> Thanks Fabian and Aljoscha,
>>>
>>> I try to implement the trigger as you described as follow:
>>>
>>> https://gist.github.com/anonymous/d0578a4d27768a75bea4
>>> 
>>>
>>> It works fine , indeed.
>>>
>>> Thanks,
>>> Anwar
>>>
>>>
>>> On Fri, Nov 27, 2015 at 11:49 AM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi Anwar,
 what Fabian wrote is completely right. I just want to give the
 reasoning for why the CountTrigger behaves as it does. The idea was to have
 Triggers that clearly focus on one thing and then at some point add
 combination triggers. For example, an OrTrigger that triggers if either of
 it’s sub triggers triggers, or an AndTrigger that triggers after both its
 sub triggers fire. (There is also more complex stuff that could be thought
 of here.)

 Cheers,
 Aljoscha
 > On 27 Nov 2015, at 09:59, fhue...@gmail.com wrote:
 >
 >
 > Hi,
 >
 > a regular tumbling time window of 5 seconds gets all elements within
 that period of time (semantics of time varies for processing, ingestion,
 and event time modes) and triggers the execution after 5 seconds.
 >
 > If you define a custom trigger, the assignment policy remains the
 same, but the trigger condition is overwritten (it is NOT additional but
 replaces the default condition), i.e., in your implementation, it will only
 trigger when 100 elements arrived. In order to trigger also when the window
 time expires, you have to register a timer (processing time or event time
 timer) via the trigger context.
 > NOTE: The window assigner will continue to assign elements to the
 window, even if the window was already evaluated. If you PURGE the window
 and an element arrives after that, a new window is created.
 >
 > To implement your trigger, you have to register a timer in the
 onEvent() method with:
 > ctx.registerEventTimeTimer(window.getEnd)
 > You can to that in every onEvent() call, because the timer is always
 overwritten.
 >
 > NOTE: you should use Flink’s keyed-state (access via triggerContext)
 if you want to keep state such as the current count.
 >
 > Hope this helps. Please let me know if you have further questions.
 > Fabian
 >
 >
 >
 >
 > From: Matthias J. Sax
 > Sent: Friday, November 27, 2015 08:44
 > To: user@flink.apache.org
 > Subject: Re: Doubt about window and count trigger
 >
 >
 > Hi,
 >
 > a Trigger is an *additional* condition for intermediate (early)
 > evaluation of the window. Thus, it is not "or-ed" to the basic window
 > definition.
 >
 > If you want to have an or-ed window condition, you can customize it by
 > specifying your own window definition.
 >
 > > dataStream.window(new MyOwnWindow() extends WindowAssigner { /* put
 your code here */ );
 >
 > -Matthias
 >
 >
 > On 11/26/2015 11:40 PM, Anwar Rizal wrote:
 > > Hi all,
 > >
 > > From the documentation:
 > > "The |Trigger| specifies when the function that comes after the
 window
 > > clause (e.g., |sum|, |count|) is evaluated (“fires”) for each
 window."
 > >
 > > So, basically, if I specify:
 > >
 > > |keyedStream
 > > .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))
 > > .trigger(CountTrigger.of(100))|
 > >
 > > |
 > > |
 > >
 > > |The execution of the window function is triggered when the count
 reaches 100 in the time window of 5 seconds. If you have a system that
 never reaches 100 in 5 seconds, basically you will never have the window
 f

Re: Doubt about window and count trigger

2015-11-27 Thread Anwar Rizal
Thanks Fabian,

Just for completion.
In that 1 min window, is my modified count trigger still valid ? Say, if in
that one minute window, I have 100 events after 30 s, it will still fire at
30th second  ?

Cheers,
anwar.



On Fri, Nov 27, 2015 at 3:31 PM, Fabian Hueske  wrote:

> Hi Anwar,
>
> You trigger looks good!
>
> I just want to make sure you know what it is exactly happening after a
> window was evaluated and purged.
> Once a window was purged, the whole window is cleared and removed. If a
> new element arrives, that would have fit into the purged window, a new
> window with exactly the same time boundaries is created, i.e., if you have
> a 5 min time window, that is fired and purged in minute 4 and a new element
> arrived immediately after the purging, it is put into a window, that will
> only "exist" for 1 more minute (and not starting a new 5 minute window).
>
> Cheers, Fabian
>
>
> 2015-11-27 14:59 GMT+01:00 Anwar Rizal :
>
>> Thanks Fabian and Aljoscha,
>>
>> I try to implement the trigger as you described as follow:
>>
>> https://gist.github.com/anonymous/d0578a4d27768a75bea4
>> 
>>
>> It works fine , indeed.
>>
>> Thanks,
>> Anwar
>>
>>
>> On Fri, Nov 27, 2015 at 11:49 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi Anwar,
>>> what Fabian wrote is completely right. I just want to give the reasoning
>>> for why the CountTrigger behaves as it does. The idea was to have Triggers
>>> that clearly focus on one thing and then at some point add combination
>>> triggers. For example, an OrTrigger that triggers if either of it’s sub
>>> triggers triggers, or an AndTrigger that triggers after both its sub
>>> triggers fire. (There is also more complex stuff that could be thought of
>>> here.)
>>>
>>> Cheers,
>>> Aljoscha
>>> > On 27 Nov 2015, at 09:59, fhue...@gmail.com wrote:
>>> >
>>> >
>>> > Hi,
>>> >
>>> > a regular tumbling time window of 5 seconds gets all elements within
>>> that period of time (semantics of time varies for processing, ingestion,
>>> and event time modes) and triggers the execution after 5 seconds.
>>> >
>>> > If you define a custom trigger, the assignment policy remains the
>>> same, but the trigger condition is overwritten (it is NOT additional but
>>> replaces the default condition), i.e., in your implementation, it will only
>>> trigger when 100 elements arrived. In order to trigger also when the window
>>> time expires, you have to register a timer (processing time or event time
>>> timer) via the trigger context.
>>> > NOTE: The window assigner will continue to assign elements to the
>>> window, even if the window was already evaluated. If you PURGE the window
>>> and an element arrives after that, a new window is created.
>>> >
>>> > To implement your trigger, you have to register a timer in the
>>> onEvent() method with:
>>> > ctx.registerEventTimeTimer(window.getEnd)
>>> > You can to that in every onEvent() call, because the timer is always
>>> overwritten.
>>> >
>>> > NOTE: you should use Flink’s keyed-state (access via triggerContext)
>>> if you want to keep state such as the current count.
>>> >
>>> > Hope this helps. Please let me know if you have further questions.
>>> > Fabian
>>> >
>>> >
>>> >
>>> >
>>> > From: Matthias J. Sax
>>> > Sent: Friday, November 27, 2015 08:44
>>> > To: user@flink.apache.org
>>> > Subject: Re: Doubt about window and count trigger
>>> >
>>> >
>>> > Hi,
>>> >
>>> > a Trigger is an *additional* condition for intermediate (early)
>>> > evaluation of the window. Thus, it is not "or-ed" to the basic window
>>> > definition.
>>> >
>>> > If you want to have an or-ed window condition, you can customize it by
>>> > specifying your own window definition.
>>> >
>>> > > dataStream.window(new MyOwnWindow() extends WindowAssigner { /* put
>>> your code here */ );
>>> >
>>> > -Matthias
>>> >
>>> >
>>> > On 11/26/2015 11:40 PM, Anwar Rizal wrote:
>>> > > Hi all,
>>> > >
>>> > > From the documentation:
>>> > > "The |Trigger| specifies when the function that comes after the
>>> window
>>> > > clause (e.g., |sum|, |count|) is evaluated (“fires”) for each
>>> window."
>>> > >
>>> > > So, basically, if I specify:
>>> > >
>>> > > |keyedStream
>>> > > .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))
>>> > > .trigger(CountTrigger.of(100))|
>>> > >
>>> > > |
>>> > > |
>>> > >
>>> > > |The execution of the window function is triggered when the count
>>> reaches 100 in the time window of 5 seconds. If you have a system that
>>> never reaches 100 in 5 seconds, basically you will never have the window
>>> fired.|
>>> > >
>>> > > |
>>> > > |
>>> > >
>>> > > |My question is, what would be the best option to have behavior as
>>> follow:|
>>> > >
>>> > > |The execution of the window function is triggered when 5 seconds is
>>> reached or 100 events are received before 5 seconds.|
>>> > >
>>> > >
>>> > > I think of implementing my own trigger that looks like CountTrigger,
>>> but that will fire also wh

Re: POJO Dataset read and write

2015-11-27 Thread Fabian Hueske
If you are just looking for an exchange format between two Flink jobs, I
would go for the TypeSerializerInput/OutputFormat.
Note that these are binary formats.

Best, Fabian

2015-11-27 15:28 GMT+01:00 Flavio Pompermaier :

> Hi to all,
>
> I have a complex POJO (with nexted objects) that I'd like to write and
> read with Flink (batch).
> What is the simplest way to do that? I can't find any example of it :(
>
> Best,
> Flavio
>


Re: flink connectors

2015-11-27 Thread Stephan Ewen
The reason why the binary distribution does not contain all connectors is
that this would add all libraries used by the connectors into the binary
distribution jar.

These libraries partly conflict with each other, and often conflict with
the libraries used by the user's programs. Not including these libraries in
the binary distribution makes life of users much easier with respect to
dependency-conflicts.

Greetings,
Stephan


On Fri, Nov 27, 2015 at 3:06 PM, Radu Tudoran 
wrote:

> Hi,
>
> Thank you for the tips!
>
> For future references in case someone else wants to search for the
> binaries for this, I would like to share the link to the maven repository
>
> http://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
>
>
>
> Dr. Radu Tudoran
> Research Engineer
> IT R&D Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
> -Original Message-
> From: Matthias J. Sax [mailto:mj...@apache.org]
> Sent: Friday, November 27, 2015 2:53 PM
> To: user@flink.apache.org
> Subject: Re: flink connectors
>
> If I understand the question right, you just want to download the jar
> manually?
>
> Just go to the maven repository website and download the jar from there.
>
>
> -Matthias
>
> On 11/27/2015 02:49 PM, Robert Metzger wrote:
> > Maybe there is a maven mirror you can access from your network?
> >
> > This site contains a list of some mirrors
> > http://stackoverflow.com/questions/5233610/what-are-the-official-mirro
> > rs-of-the-maven-central-repository
> > You don't have to use the maven tool, you can also manually browse for
> > the jars and download what you need.
> >
> >
> > On Fri, Nov 27, 2015 at 2:46 PM, Fabian Hueske  > > wrote:
> >
> > You can always build Flink from source, but apart from that I am not
> > aware of an alternative.
> >
> > 2015-11-27 14:42 GMT+01:00 Radu Tudoran  > >:
> >
> > Hi,
> >
> > __ __
> >
> > Is there any alternative to avoiding maven?
> >
> > That is why I was curious if there is a binary distribution of
> > this available for download directly
> >
> > __ __
> >
> > Dr. Radu Tudoran
> >
> > Research Engineer
> >
> > IT R&D Division
> >
> > __ __
> >
> > cid:image007.jpg@01CD52EB.AD060EE0
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >
> > European Research Center
> >
> > Riesstrasse 25, 80992 München
> >
> > __ __
> >
> > E-mail: _radu.tudo...@huawei.com
> > _
> >
> > Mobile: +49 15209084330 
> >
> > Telephone: +49 891588344173 
> >
> > __ __
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > 
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> 56063,
> > Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> > 56063,
> > Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> >
> > This e-mail and its attachments contain confidential information
> > from HUAWEI, which is intended only for the person or entity
> > whose address is listed above. Any use of the information
> > contained herein in any way (including, but not limited to,
> > total or partial disclosure, reproduction, or dissemination) by
> > persons other than the intended recipient(s) is prohibited. If
> > you receive this e-mail in error, please notify the sender by
> > phone or email immediately and delete it!
> >
> > __ __
> >
> > *From:*Fabian Hueske [mailto:fhue...@gmail.com
> > ]
> > *Sent:* Friday, November 27, 2015 2:41 PM
> > 

Re: Doubt about window and count trigger

2015-11-27 Thread Fabian Hueske
Hi Anwar,

You trigger looks good!

I just want to make sure you know what it is exactly happening after a
window was evaluated and purged.
Once a window was purged, the whole window is cleared and removed. If a new
element arrives, that would have fit into the purged window, a new window
with exactly the same time boundaries is created, i.e., if you have a 5 min
time window, that is fired and purged in minute 4 and a new element arrived
immediately after the purging, it is put into a window, that will only
"exist" for 1 more minute (and not starting a new 5 minute window).

Cheers, Fabian


2015-11-27 14:59 GMT+01:00 Anwar Rizal :

> Thanks Fabian and Aljoscha,
>
> I try to implement the trigger as you described as follow:
>
> https://gist.github.com/anonymous/d0578a4d27768a75bea4
> 
>
> It works fine , indeed.
>
> Thanks,
> Anwar
>
>
> On Fri, Nov 27, 2015 at 11:49 AM, Aljoscha Krettek 
> wrote:
>
>> Hi Anwar,
>> what Fabian wrote is completely right. I just want to give the reasoning
>> for why the CountTrigger behaves as it does. The idea was to have Triggers
>> that clearly focus on one thing and then at some point add combination
>> triggers. For example, an OrTrigger that triggers if either of it’s sub
>> triggers triggers, or an AndTrigger that triggers after both its sub
>> triggers fire. (There is also more complex stuff that could be thought of
>> here.)
>>
>> Cheers,
>> Aljoscha
>> > On 27 Nov 2015, at 09:59, fhue...@gmail.com wrote:
>> >
>> >
>> > Hi,
>> >
>> > a regular tumbling time window of 5 seconds gets all elements within
>> that period of time (semantics of time varies for processing, ingestion,
>> and event time modes) and triggers the execution after 5 seconds.
>> >
>> > If you define a custom trigger, the assignment policy remains the same,
>> but the trigger condition is overwritten (it is NOT additional but replaces
>> the default condition), i.e., in your implementation, it will only trigger
>> when 100 elements arrived. In order to trigger also when the window time
>> expires, you have to register a timer (processing time or event time timer)
>> via the trigger context.
>> > NOTE: The window assigner will continue to assign elements to the
>> window, even if the window was already evaluated. If you PURGE the window
>> and an element arrives after that, a new window is created.
>> >
>> > To implement your trigger, you have to register a timer in the
>> onEvent() method with:
>> > ctx.registerEventTimeTimer(window.getEnd)
>> > You can to that in every onEvent() call, because the timer is always
>> overwritten.
>> >
>> > NOTE: you should use Flink’s keyed-state (access via triggerContext) if
>> you want to keep state such as the current count.
>> >
>> > Hope this helps. Please let me know if you have further questions.
>> > Fabian
>> >
>> >
>> >
>> >
>> > From: Matthias J. Sax
>> > Sent: Friday, November 27, 2015 08:44
>> > To: user@flink.apache.org
>> > Subject: Re: Doubt about window and count trigger
>> >
>> >
>> > Hi,
>> >
>> > a Trigger is an *additional* condition for intermediate (early)
>> > evaluation of the window. Thus, it is not "or-ed" to the basic window
>> > definition.
>> >
>> > If you want to have an or-ed window condition, you can customize it by
>> > specifying your own window definition.
>> >
>> > > dataStream.window(new MyOwnWindow() extends WindowAssigner { /* put
>> your code here */ );
>> >
>> > -Matthias
>> >
>> >
>> > On 11/26/2015 11:40 PM, Anwar Rizal wrote:
>> > > Hi all,
>> > >
>> > > From the documentation:
>> > > "The |Trigger| specifies when the function that comes after the window
>> > > clause (e.g., |sum|, |count|) is evaluated (“fires”) for each window."
>> > >
>> > > So, basically, if I specify:
>> > >
>> > > |keyedStream
>> > > .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))
>> > > .trigger(CountTrigger.of(100))|
>> > >
>> > > |
>> > > |
>> > >
>> > > |The execution of the window function is triggered when the count
>> reaches 100 in the time window of 5 seconds. If you have a system that
>> never reaches 100 in 5 seconds, basically you will never have the window
>> fired.|
>> > >
>> > > |
>> > > |
>> > >
>> > > |My question is, what would be the best option to have behavior as
>> follow:|
>> > >
>> > > |The execution of the window function is triggered when 5 seconds is
>> reached or 100 events are received before 5 seconds.|
>> > >
>> > >
>> > > I think of implementing my own trigger that looks like CountTrigger,
>> but that will fire also when the end of time window is reached (at the
>> moment, it just returns Continue, instead of Fired). But maybe there's a
>> better way ?
>> > >
>> > > Is there a reason why CountTrigger is implemented as it is
>> implemented today, and not as I described above (5 seconds or 100 events
>> reached, whichever comes first).
>> > >
>> > >
>> > > Thanks,
>> > >
>> > > Anwar.
>> > >
>>
>>
>


Re: Cleanup of OperatorStates?

2015-11-27 Thread Stephan Ewen
Hey Niels!

You may be able to implement this in windows anyways, depending on your
setup. You can definitely implement state with timeout yourself (using the
more low-level state interface), or you may be able to use custom windows
for that (they can trigger on every element and return elements
immediately, thereby giving you low latency).

Can you tell me where exactly the session ID comes from? Is that something
that the function with state generates itself?
Depending on that answer, I can outline either the window, or the custom
state way...

Greetings,
Stephan





On Fri, Nov 27, 2015 at 2:19 PM, Niels Basjes  wrote:

> Hi,
>
> Thanks for the explanation.
> I have clickstream data arriving in realtime and I need to assign the
> visitId and stream it out again (with the visitId now begin part of the
> record) into Kafka with the lowest possible latency.
> Although the Window feature allows me to group and close the visit on a
> timeout/expire (as shown to me by Aljoscha in a separate email) it does
> make a 'window'.
>
> So (as requested) I created a ticket for such a feature:
> https://issues.apache.org/jira/browse/FLINK-3089
>
> Niels
>
>
>
>
>
>
> On Fri, Nov 27, 2015 at 11:51 AM, Stephan Ewen  wrote:
>
>> Hi Niels!
>>
>> Currently, state is released by setting the value for the key to null. If
>> you are tracking web sessions, you can try and send a "end of session"
>> element that sets the value to null.
>>
>> To be on the safe side, you probably want state that is automatically
>> purged after a while. I would look into using Windows for that. The
>> triggers there are flexible so you can schedule both actions on elements
>> plus cleanup after a certain time delay (clock time or event time).
>>
>> The question about "state expiry" has come a few times. People seem to
>> like working on state directly, but it should clean up automatically.
>>
>> Can you see if your use case fits onto windows, otherwise open a ticket
>> for state expiry?
>>
>> Greetings,
>> Stephan
>>
>>
>> On Thu, Nov 26, 2015 at 10:42 PM, Niels Basjes  wrote:
>>
>>> Hi,
>>>
>>> I'm working on a streaming application that ingests clickstream data.
>>> In a specific part of the flow I need to retain a little bit of state
>>> per visitor (i.e. keyBy(sessionid) )
>>>
>>> So I'm using the Key/Value state interface (i.e. OperatorState)
>>> in a map function.
>>>
>>> Now in my application I expect to get a huge number of sessions per day.
>>> Since these sessionids are 'random' and become unused after the visitor
>>> leaves the website over time the system will have seen millions of those
>>> sessionids.
>>>
>>> So I was wondering: how are these OperatorStates cleaned?
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


POJO Dataset read and write

2015-11-27 Thread Flavio Pompermaier
Hi to all,

I have a complex POJO (with nexted objects) that I'd like to write and read
with Flink (batch).
What is the simplest way to do that? I can't find any example of it :(

Best,
Flavio


[ANNOUNCE] Flink 0.10.1 released

2015-11-27 Thread Robert Metzger
The Flink PMC is pleased to announce the availability of Flink 0.10.1.

The official release announcement:
http://flink.apache.org/news/2015/11/27/release-0.10.1.html
Release binaries: http://apache.openmirror.de/flink/flink-0.10.1/

Please update your maven dependencies to the new 0.10.1 version and update
your binaries.


On behalf of the Flink PMC, I would like to thank everybody who contributed
to the release!


Re: Interpretation of Trigger and Eviction on a window

2015-11-27 Thread Fabian Hueske
Hi Nirmalya,

it is correct that the evictor is called BEFORE the window function is
applied because this is required to support certain types of sliding
windows.
If you want to remove all elements from the window after the window
function was applied, you need a trigger that purges the window. The
CountTrigger is not doing this, how ever you can wrap it in a
PurgingTrigger:

windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(10))).sum(_._1)

Best, Fabian

2015-11-27 13:25 GMT+01:00 Nirmalya Sengupta :

> Hello there.
>
> I have just started exploring Apache Flink, and it has immediately got me
> excited. Because I am a beginner, my questions may be a bit naive. Please
> bear with me.
>
> I refer to this particular sentence from Flink 0.10.0 Guide
> 
> :
>
> ' *After the trigger fires, and before the function (e.g., sum, count) is
> applied to the window contents, an optional Evictor removes some elements
> from the beginning of the window before the remaining elements are passed
> on to the function* '
>
> I am a bit confused with the assertion that elements are evicted *before*
> the function is applied. Let me elaborate what my understanding is.
>
> Let us say that my window has a 'count' trigger of 10 elements, with some
> time-pane of 2 seconds (assumption: rate of ingestion is fast enough for at
> least 10 elements to arrive within 2 seconds).
>
> windowedStream.*trigger*(CountTrigger.of(10)).*evictor*(CounEvictor.of(10)).sum(_._1)
> // summation of the 2nd field of a tuple
>
> Now, till the time 9 elements have gathered in the window, the trigger is
> dormant. After the 10th element  enters the window-pane, the trigger is
> fired. At this point in time, all these 10 elements should be passed to the
> _sum_ function so that correct summated value is generated and **only
> then** the evictor is allowed to take out all the 10 elements leaving the
> window-pane empty. The window's element count is set to zero and  it awaits
> the arrival of the next element.
>
> However, what the documents seems to suggest is that the evictor will be
> able to take out _some_ (how many?) elements from the _beginning_ of the
> window, before the _sum_ function can see the elements. Isn't this
> counterintuitive or am I missing something obvious here?
>
> Will keenly wait to hear from you.
>
> -- Nirmalya
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>


Re: Continuing from the stackoverflow post

2015-11-27 Thread Fabian Hueske
Hi Nirmalya,

can you describe the semantics that you want to implement?
Do you want to find the max temperature every 5 milliseconds or the max of
every 5 records?

Right now, you are using a non-keyed timeWindow of 5 milliseconds. This
will create a window for the complete stream every 5 msecs.
However, because you overwrite the time trigger of the timeWindow, the
window is not evaluated after 5 msecs, but instead after 5 elements were
added to the window. Because of the evictor, only 4 elements are given into
the max aggregation function. However, the window is not closed after the
trigger was fired. Instead, more elements are added to the window until the
5 msecs are over. Then a new window is opened and triggered again after 5
elements were added.

I know, these things are not easy to understand. We are working on a
document that clarifies the dependencies of window assigner, trigger,
evictor, window function, processing / event time, etc.

Let's start by telling me what you actually want to do ;-)

Cheers, Fabian

2015-11-27 14:49 GMT+01:00 Nirmalya Sengupta :

> Hello Fabian/Matthius,
>
> Many thanks for showing interest in my query on SOF. That helps me sustain
> my enthusiasm. :-)
>
> After setting parallelism of environment to '1' and replacing _max()_ with
> _maxBy()_, I get a list of maximum temperatures but I fail to explain to
> myself, how does Flink arrive at those figures (attached below). I
> understand that different runs will possibly generate different results,
> because I am using **ProcessingTime** characteristic. Yet, I expect some
> kind of a deterministic output which I don't see.
>
> Please prod me to the right direction.
>
> Here's the code I have been referring to:
>
> -
>
> case class IncomingDataUnit (
>   sensorUUID: String, radiationLevel:
> Int,photoSensor: Float,
>   humidity: Float,timeStamp: Long,
> ambientTemperature: Float)
>   extends Serializable { }
>
>
> object SocketTextStreamWordCount {
>
>   def main(args: Array[String]) {
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> env.setParallelism(1)
>
> val readings =
>   readIncomingReadings(env,"./sampleIOTTiny.csv")
>   .map(e => (e.sensorUUID,e.ambientTemperature))
>   .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
>   .trigger(CountTrigger.of(5))
>   .evictor(CountEvictor.of(4))
>   .maxBy(1)
>
> readings.print
>
> env.execute("Scala IOT Stream  experiment Example")
>
>   }
>
>   private def readIncomingReadings(env:
> StreamExecutionEnvironment,inputPath: String) :
> DataStream[IncomingDataUnit] = {
> env.readTextFile(inputPath).map(datum => {
>   val fields = datum.split(",")
>   IncomingDataUnit(
> fields(0),  // sensorUUID
> fields(1).toInt,// radiationLevel
> fields(2).toFloat,  // photoSensor
> fields(3).toFloat,  // humidity
> fields(4).toLong,   // timeStamp
> fields(5).toFloat   // ambientTemperature
>   )
> })
>   }
>
>  }
>
> -
>
> Here's the dataset:
> 
>
> probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
> probe-dccefede,199,749.25,78.6057,1448028160,27.46
> probe-f29f9662,199,821.81,81.7831,1448028160,22.35
> probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98
> probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02
> probe-4d78b545,204,778.42,78.412,1448028160,25.92
> probe-400c5cdf,204,711.65,73.585,1448028160,22.18
> probe-df2d4cad,199,820.8,72.936,1448028161,16.18
> probe-f4ef109e,199,785.68,77.5647,1448028161,16.36
> probe-3fac3350,200,720.12,78.2073,1448028161,19.19
> probe-42a9ddca,193,819.12,74.3712,1448028161,22.07
> probe-252a5bbd,197,710.32,80.6072,1448028161,14.64
> probe-987f2cb6,200,750.4,76.0533,1448028161,14.72
> probe-2323,197,816.06,84.0816,1448028161,4.405
> probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43
> probe-20c609fb,204,804.37,84.5243,1448028161,22.87
> probe-c027fdc9,195,858.61,81.7682,1448028161,24.47
> probe-2c6cd3de,198,826.96,85.26,1448028162,18.99
> probe-960906ca,197,797.63,77.4359,1448028162,27.62
> -
>
> And here's the output:
>
> -
>
> (probe-6c75cfbe,30.02)
> (probe-42a9ddca,22.07)
> (probe-960906ca,27.62)
> (probe-400c5cdf,22.18)
> (probe-f076c2b0,29.37)
> (probe-6c75cfbe,30.02)
> (probe-960906ca,27.62)
>
> -
>
>
> -- Nirmalya
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>


RE: flink connectors

2015-11-27 Thread Radu Tudoran
Hi,

Thank you for the tips!

For future references in case someone else wants to search for the binaries for 
this, I would like to share the link to the maven repository

http://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka



Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-Original Message-
From: Matthias J. Sax [mailto:mj...@apache.org] 
Sent: Friday, November 27, 2015 2:53 PM
To: user@flink.apache.org
Subject: Re: flink connectors

If I understand the question right, you just want to download the jar manually?

Just go to the maven repository website and download the jar from there.


-Matthias

On 11/27/2015 02:49 PM, Robert Metzger wrote:
> Maybe there is a maven mirror you can access from your network?
> 
> This site contains a list of some mirrors 
> http://stackoverflow.com/questions/5233610/what-are-the-official-mirro
> rs-of-the-maven-central-repository
> You don't have to use the maven tool, you can also manually browse for 
> the jars and download what you need.
> 
> 
> On Fri, Nov 27, 2015 at 2:46 PM, Fabian Hueske  > wrote:
> 
> You can always build Flink from source, but apart from that I am not
> aware of an alternative.
> 
> 2015-11-27 14:42 GMT+01:00 Radu Tudoran  >:
> 
> Hi,
> 
> __ __
> 
> Is there any alternative to avoiding maven?
> 
> That is why I was curious if there is a binary distribution of
> this available for download directly
> 
> __ __
> 
> Dr. Radu Tudoran
> 
> Research Engineer
> 
> IT R&D Division
> 
> __ __
> 
> cid:image007.jpg@01CD52EB.AD060EE0
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> 
> European Research Center
> 
> Riesstrasse 25, 80992 München
> 
> __ __
> 
> E-mail: _radu.tudo...@huawei.com
> _
> 
> Mobile: +49 15209084330 
> 
> Telephone: +49 891588344173 
> 
> __ __
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> 
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> 
> This e-mail and its attachments contain confidential information
> from HUAWEI, which is intended only for the person or entity
> whose address is listed above. Any use of the information
> contained herein in any way (including, but not limited to,
> total or partial disclosure, reproduction, or dissemination) by
> persons other than the intended recipient(s) is prohibited. If
> you receive this e-mail in error, please notify the sender by
> phone or email immediately and delete it!
> 
> __ __
> 
> *From:*Fabian Hueske [mailto:fhue...@gmail.com
> ]
> *Sent:* Friday, November 27, 2015 2:41 PM
> *To:* user@flink.apache.org 
> *Subject:* Re: flink connectors
> 
> __ __
> 
> Hi Radu,
> 
> the connectors are available in Maven Central.
> 
> Just add them as a dependency in your project and they will be
> fetched and included.
> 
> Best, Fabian
> 
> __ __
> 
> 2015-11-27 14:38 GMT+01:00 Radu Tudoran  >:
> 
> Hi,
> 
>  
> 
> I was trying to use flink connectors. However, when I tried to
> import this
> 
>  
> 
> import org.apache.flink.streaming.connector

Re: flink connectors

2015-11-27 Thread Ovidiu-Cristian MARCU
Hi,

The main question here is why the distribution release doesn’t contain the 
connector dependencies.
It is fair to say that it does not have to (which connector to include or all). 
So just like Spark does, Flink offers binary distribution for hadoop only 
without considering other dependencies.

The thing to consider is if it may help, on the flink.apache.org download page, 
to offer a customised page in order to let the user choose also a dependency 
(connector) to be included in the binary to be downloaded.

Best regards,
Ovidiu



> On 27 Nov 2015, at 14:52, Matthias J. Sax  wrote:
> 
> If I understand the question right, you just want to download the jar
> manually?
> 
> Just go to the maven repository website and download the jar from there.
> 
> 
> -Matthias
> 
> On 11/27/2015 02:49 PM, Robert Metzger wrote:
>> Maybe there is a maven mirror you can access from your network?
>> 
>> This site contains a list of some mirrors
>> http://stackoverflow.com/questions/5233610/what-are-the-official-mirrors-of-the-maven-central-repository
>> You don't have to use the maven tool, you can also manually browse for
>> the jars and download what you need.
>> 
>> 
>> On Fri, Nov 27, 2015 at 2:46 PM, Fabian Hueske > > wrote:
>> 
>>You can always build Flink from source, but apart from that I am not
>>aware of an alternative.
>> 
>>2015-11-27 14:42 GMT+01:00 Radu Tudoran >>:
>> 
>>Hi,
>> 
>>__ __
>> 
>>Is there any alternative to avoiding maven?
>> 
>>That is why I was curious if there is a binary distribution of
>>this available for download directly
>> 
>>__ __
>> 
>>Dr. Radu Tudoran
>> 
>>Research Engineer
>> 
>>IT R&D Division
>> 
>>__ __
>> 
>>cid:image007.jpg@01CD52EB.AD060EE0
>> 
>>HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> 
>>European Research Center
>> 
>>Riesstrasse 25, 80992 München
>> 
>>__ __
>> 
>>E-mail: _radu.tudo...@huawei.com
>>_
>> 
>>Mobile: +49 15209084330 
>> 
>>Telephone: +49 891588344173 
>> 
>>__ __
>> 
>>HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>
>>Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>>Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
>>56063,
>>Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>> 
>>This e-mail and its attachments contain confidential information
>>from HUAWEI, which is intended only for the person or entity
>>whose address is listed above. Any use of the information
>>contained herein in any way (including, but not limited to,
>>total or partial disclosure, reproduction, or dissemination) by
>>persons other than the intended recipient(s) is prohibited. If
>>you receive this e-mail in error, please notify the sender by
>>phone or email immediately and delete it!
>> 
>>__ __
>> 
>>*From:*Fabian Hueske [mailto:fhue...@gmail.com
>>]
>>*Sent:* Friday, November 27, 2015 2:41 PM
>>*To:* user@flink.apache.org 
>>*Subject:* Re: flink connectors
>> 
>>__ __
>> 
>>Hi Radu,
>> 
>>the connectors are available in Maven Central.
>> 
>>Just add them as a dependency in your project and they will be
>>fetched and included.
>> 
>>Best, Fabian
>> 
>>__ __
>> 
>>2015-11-27 14:38 GMT+01:00 Radu Tudoran >>:
>> 
>>Hi,
>> 
>> 
>> 
>>I was trying to use flink connectors. However, when I tried to
>>import this
>> 
>> 
>> 
>>import org.apache.flink.streaming.connectors.*;
>> 
>> 
>> 
>>I saw that they are not present in the binary distribution as
>>downloaded from website (flink-dist-0.10.0.jar). Is this
>>intentionally? Is there also a binary distribution that contains
>>these connectors?
>> 
>> 
>> 
>>Regards,
>> 
>> 
>> 
>>Dr. Radu Tudoran
>> 
>>Research Engineer
>> 
>>IT R&D Division
>> 
>> 
>> 
>>cid:image007.jpg@01CD52EB.AD060EE0
>> 
>>HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> 
>>European Research Center
>> 
>>Riesstrasse 25, 80992 München
>> 
>> 
>> 
>>E-mail: _radu.tudo...@huawei.com
>>_
>> 
>>Mobile: +49 15209084330 _

Re: Doubt about window and count trigger

2015-11-27 Thread Anwar Rizal
Thanks Fabian and Aljoscha,

I try to implement the trigger as you described as follow:

https://gist.github.com/anonymous/d0578a4d27768a75bea4


It works fine , indeed.

Thanks,
Anwar


On Fri, Nov 27, 2015 at 11:49 AM, Aljoscha Krettek 
wrote:

> Hi Anwar,
> what Fabian wrote is completely right. I just want to give the reasoning
> for why the CountTrigger behaves as it does. The idea was to have Triggers
> that clearly focus on one thing and then at some point add combination
> triggers. For example, an OrTrigger that triggers if either of it’s sub
> triggers triggers, or an AndTrigger that triggers after both its sub
> triggers fire. (There is also more complex stuff that could be thought of
> here.)
>
> Cheers,
> Aljoscha
> > On 27 Nov 2015, at 09:59, fhue...@gmail.com wrote:
> >
> >
> > Hi,
> >
> > a regular tumbling time window of 5 seconds gets all elements within
> that period of time (semantics of time varies for processing, ingestion,
> and event time modes) and triggers the execution after 5 seconds.
> >
> > If you define a custom trigger, the assignment policy remains the same,
> but the trigger condition is overwritten (it is NOT additional but replaces
> the default condition), i.e., in your implementation, it will only trigger
> when 100 elements arrived. In order to trigger also when the window time
> expires, you have to register a timer (processing time or event time timer)
> via the trigger context.
> > NOTE: The window assigner will continue to assign elements to the
> window, even if the window was already evaluated. If you PURGE the window
> and an element arrives after that, a new window is created.
> >
> > To implement your trigger, you have to register a timer in the onEvent()
> method with:
> > ctx.registerEventTimeTimer(window.getEnd)
> > You can to that in every onEvent() call, because the timer is always
> overwritten.
> >
> > NOTE: you should use Flink’s keyed-state (access via triggerContext) if
> you want to keep state such as the current count.
> >
> > Hope this helps. Please let me know if you have further questions.
> > Fabian
> >
> >
> >
> >
> > From: Matthias J. Sax
> > Sent: Friday, November 27, 2015 08:44
> > To: user@flink.apache.org
> > Subject: Re: Doubt about window and count trigger
> >
> >
> > Hi,
> >
> > a Trigger is an *additional* condition for intermediate (early)
> > evaluation of the window. Thus, it is not "or-ed" to the basic window
> > definition.
> >
> > If you want to have an or-ed window condition, you can customize it by
> > specifying your own window definition.
> >
> > > dataStream.window(new MyOwnWindow() extends WindowAssigner { /* put
> your code here */ );
> >
> > -Matthias
> >
> >
> > On 11/26/2015 11:40 PM, Anwar Rizal wrote:
> > > Hi all,
> > >
> > > From the documentation:
> > > "The |Trigger| specifies when the function that comes after the window
> > > clause (e.g., |sum|, |count|) is evaluated (“fires”) for each window."
> > >
> > > So, basically, if I specify:
> > >
> > > |keyedStream
> > > .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))
> > > .trigger(CountTrigger.of(100))|
> > >
> > > |
> > > |
> > >
> > > |The execution of the window function is triggered when the count
> reaches 100 in the time window of 5 seconds. If you have a system that
> never reaches 100 in 5 seconds, basically you will never have the window
> fired.|
> > >
> > > |
> > > |
> > >
> > > |My question is, what would be the best option to have behavior as
> follow:|
> > >
> > > |The execution of the window function is triggered when 5 seconds is
> reached or 100 events are received before 5 seconds.|
> > >
> > >
> > > I think of implementing my own trigger that looks like CountTrigger,
> but that will fire also when the end of time window is reached (at the
> moment, it just returns Continue, instead of Fired). But maybe there's a
> better way ?
> > >
> > > Is there a reason why CountTrigger is implemented as it is implemented
> today, and not as I described above (5 seconds or 100 events reached,
> whichever comes first).
> > >
> > >
> > > Thanks,
> > >
> > > Anwar.
> > >
>
>


Re: flink connectors

2015-11-27 Thread Matthias J. Sax
If I understand the question right, you just want to download the jar
manually?

Just go to the maven repository website and download the jar from there.


-Matthias

On 11/27/2015 02:49 PM, Robert Metzger wrote:
> Maybe there is a maven mirror you can access from your network?
> 
> This site contains a list of some mirrors
> http://stackoverflow.com/questions/5233610/what-are-the-official-mirrors-of-the-maven-central-repository
> You don't have to use the maven tool, you can also manually browse for
> the jars and download what you need.
> 
> 
> On Fri, Nov 27, 2015 at 2:46 PM, Fabian Hueske  > wrote:
> 
> You can always build Flink from source, but apart from that I am not
> aware of an alternative.
> 
> 2015-11-27 14:42 GMT+01:00 Radu Tudoran  >:
> 
> Hi,
> 
> __ __
> 
> Is there any alternative to avoiding maven?
> 
> That is why I was curious if there is a binary distribution of
> this available for download directly
> 
> __ __
> 
> Dr. Radu Tudoran
> 
> Research Engineer
> 
> IT R&D Division
> 
> __ __
> 
> cid:image007.jpg@01CD52EB.AD060EE0
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> 
> European Research Center
> 
> Riesstrasse 25, 80992 München
> 
> __ __
> 
> E-mail: _radu.tudo...@huawei.com
> _
> 
> Mobile: +49 15209084330 
> 
> Telephone: +49 891588344173 
> 
> __ __
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> 
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> 
> This e-mail and its attachments contain confidential information
> from HUAWEI, which is intended only for the person or entity
> whose address is listed above. Any use of the information
> contained herein in any way (including, but not limited to,
> total or partial disclosure, reproduction, or dissemination) by
> persons other than the intended recipient(s) is prohibited. If
> you receive this e-mail in error, please notify the sender by
> phone or email immediately and delete it!
> 
> __ __
> 
> *From:*Fabian Hueske [mailto:fhue...@gmail.com
> ]
> *Sent:* Friday, November 27, 2015 2:41 PM
> *To:* user@flink.apache.org 
> *Subject:* Re: flink connectors
> 
> __ __
> 
> Hi Radu,
> 
> the connectors are available in Maven Central.
> 
> Just add them as a dependency in your project and they will be
> fetched and included.
> 
> Best, Fabian
> 
> __ __
> 
> 2015-11-27 14:38 GMT+01:00 Radu Tudoran  >:
> 
> Hi,
> 
>  
> 
> I was trying to use flink connectors. However, when I tried to
> import this
> 
>  
> 
> import org.apache.flink.streaming.connectors.*;
> 
>  
> 
> I saw that they are not present in the binary distribution as
> downloaded from website (flink-dist-0.10.0.jar). Is this
> intentionally? Is there also a binary distribution that contains
> these connectors?
> 
>  
> 
> Regards,
> 
>  
> 
> Dr. Radu Tudoran
> 
> Research Engineer
> 
> IT R&D Division
> 
>  
> 
> cid:image007.jpg@01CD52EB.AD060EE0
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> 
> European Research Center
> 
> Riesstrasse 25, 80992 München
> 
>  
> 
> E-mail: _radu.tudo...@huawei.com
> _
> 
> Mobile: +49 15209084330 
> 
> Telephone: +49 891588344173 
> 
>  
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> 
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> 
> This e-mail and its attachments contain confidential information
> from HUAWEI, which is intended only for the person or entity
> 

Re: flink connectors

2015-11-27 Thread Robert Metzger
Maybe there is a maven mirror you can access from your network?

This site contains a list of some mirrors
http://stackoverflow.com/questions/5233610/what-are-the-official-mirrors-of-the-maven-central-repository
You don't have to use the maven tool, you can also manually browse for the
jars and download what you need.


On Fri, Nov 27, 2015 at 2:46 PM, Fabian Hueske  wrote:

> You can always build Flink from source, but apart from that I am not aware
> of an alternative.
>
> 2015-11-27 14:42 GMT+01:00 Radu Tudoran :
>
>> Hi,
>>
>>
>>
>> Is there any alternative to avoiding maven?
>>
>> That is why I was curious if there is a binary distribution of this
>> available for download directly
>>
>>
>>
>> Dr. Radu Tudoran
>>
>> Research Engineer
>>
>> IT R&D Division
>>
>>
>>
>> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>
>> European Research Center
>>
>> Riesstrasse 25, 80992 München
>>
>>
>>
>> E-mail: *radu.tudo...@huawei.com *
>>
>> Mobile: +49 15209084330
>>
>> Telephone: +49 891588344173
>>
>>
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>
>> This e-mail and its attachments contain confidential information from
>> HUAWEI, which is intended only for the person or entity whose address is
>> listed above. Any use of the information contained herein in any way
>> (including, but not limited to, total or partial disclosure, reproduction,
>> or dissemination) by persons other than the intended recipient(s) is
>> prohibited. If you receive this e-mail in error, please notify the sender
>> by phone or email immediately and delete it!
>>
>>
>>
>> *From:* Fabian Hueske [mailto:fhue...@gmail.com]
>> *Sent:* Friday, November 27, 2015 2:41 PM
>> *To:* user@flink.apache.org
>> *Subject:* Re: flink connectors
>>
>>
>>
>> Hi Radu,
>>
>> the connectors are available in Maven Central.
>>
>> Just add them as a dependency in your project and they will be fetched
>> and included.
>>
>> Best, Fabian
>>
>>
>>
>> 2015-11-27 14:38 GMT+01:00 Radu Tudoran :
>>
>> Hi,
>>
>>
>>
>> I was trying to use flink connectors. However, when I tried to import this
>>
>>
>>
>> import org.apache.flink.streaming.connectors.*;
>>
>>
>>
>> I saw that they are not present in the binary distribution as downloaded
>> from website (flink-dist-0.10.0.jar). Is this intentionally? Is there also
>> a binary distribution that contains these connectors?
>>
>>
>>
>> Regards,
>>
>>
>>
>> Dr. Radu Tudoran
>>
>> Research Engineer
>>
>> IT R&D Division
>>
>>
>>
>> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>
>> European Research Center
>>
>> Riesstrasse 25, 80992 München
>>
>>
>>
>> E-mail: *radu.tudo...@huawei.com *
>>
>> Mobile: +49 15209084330
>>
>> Telephone: +49 891588344173
>>
>>
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>
>> This e-mail and its attachments contain confidential information from
>> HUAWEI, which is intended only for the person or entity whose address is
>> listed above. Any use of the information contained herein in any way
>> (including, but not limited to, total or partial disclosure, reproduction,
>> or dissemination) by persons other than the intended recipient(s) is
>> prohibited. If you receive this e-mail in error, please notify the sender
>> by phone or email immediately and delete it!
>>
>>
>>
>>
>>
>
>


Continuing from the stackoverflow post

2015-11-27 Thread Nirmalya Sengupta
Hello Fabian/Matthius,

Many thanks for showing interest in my query on SOF. That helps me sustain
my enthusiasm. :-)

After setting parallelism of environment to '1' and replacing _max()_ with
_maxBy()_, I get a list of maximum temperatures but I fail to explain to
myself, how does Flink arrive at those figures (attached below). I
understand that different runs will possibly generate different results,
because I am using **ProcessingTime** characteristic. Yet, I expect some
kind of a deterministic output which I don't see.

Please prod me to the right direction.

Here's the code I have been referring to:

-

case class IncomingDataUnit (
  sensorUUID: String, radiationLevel:
Int,photoSensor: Float,
  humidity: Float,timeStamp: Long,
ambientTemperature: Float)
  extends Serializable { }


object SocketTextStreamWordCount {

  def main(args: Array[String]) {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.setParallelism(1)

val readings =
  readIncomingReadings(env,"./sampleIOTTiny.csv")
  .map(e => (e.sensorUUID,e.ambientTemperature))
  .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
  .trigger(CountTrigger.of(5))
  .evictor(CountEvictor.of(4))
  .maxBy(1)

readings.print

env.execute("Scala IOT Stream  experiment Example")

  }

  private def readIncomingReadings(env:
StreamExecutionEnvironment,inputPath: String) :
DataStream[IncomingDataUnit] = {
env.readTextFile(inputPath).map(datum => {
  val fields = datum.split(",")
  IncomingDataUnit(
fields(0),  // sensorUUID
fields(1).toInt,// radiationLevel
fields(2).toFloat,  // photoSensor
fields(3).toFloat,  // humidity
fields(4).toLong,   // timeStamp
fields(5).toFloat   // ambientTemperature
  )
})
  }

 }

-

Here's the dataset:


probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
probe-dccefede,199,749.25,78.6057,1448028160,27.46
probe-f29f9662,199,821.81,81.7831,1448028160,22.35
probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98
probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02
probe-4d78b545,204,778.42,78.412,1448028160,25.92
probe-400c5cdf,204,711.65,73.585,1448028160,22.18
probe-df2d4cad,199,820.8,72.936,1448028161,16.18
probe-f4ef109e,199,785.68,77.5647,1448028161,16.36
probe-3fac3350,200,720.12,78.2073,1448028161,19.19
probe-42a9ddca,193,819.12,74.3712,1448028161,22.07
probe-252a5bbd,197,710.32,80.6072,1448028161,14.64
probe-987f2cb6,200,750.4,76.0533,1448028161,14.72
probe-2323,197,816.06,84.0816,1448028161,4.405
probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43
probe-20c609fb,204,804.37,84.5243,1448028161,22.87
probe-c027fdc9,195,858.61,81.7682,1448028161,24.47
probe-2c6cd3de,198,826.96,85.26,1448028162,18.99
probe-960906ca,197,797.63,77.4359,1448028162,27.62
-

And here's the output:

-

(probe-6c75cfbe,30.02)
(probe-42a9ddca,22.07)
(probe-960906ca,27.62)
(probe-400c5cdf,22.18)
(probe-f076c2b0,29.37)
(probe-6c75cfbe,30.02)
(probe-960906ca,27.62)

-


-- Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


Re: flink connectors

2015-11-27 Thread Fabian Hueske
You can always build Flink from source, but apart from that I am not aware
of an alternative.

2015-11-27 14:42 GMT+01:00 Radu Tudoran :

> Hi,
>
>
>
> Is there any alternative to avoiding maven?
>
> That is why I was curious if there is a binary distribution of this
> available for download directly
>
>
>
> Dr. Radu Tudoran
>
> Research Engineer
>
> IT R&D Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>
>
> E-mail: *radu.tudo...@huawei.com *
>
> Mobile: +49 15209084330
>
> Telephone: +49 891588344173
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
>
> *From:* Fabian Hueske [mailto:fhue...@gmail.com]
> *Sent:* Friday, November 27, 2015 2:41 PM
> *To:* user@flink.apache.org
> *Subject:* Re: flink connectors
>
>
>
> Hi Radu,
>
> the connectors are available in Maven Central.
>
> Just add them as a dependency in your project and they will be fetched and
> included.
>
> Best, Fabian
>
>
>
> 2015-11-27 14:38 GMT+01:00 Radu Tudoran :
>
> Hi,
>
>
>
> I was trying to use flink connectors. However, when I tried to import this
>
>
>
> import org.apache.flink.streaming.connectors.*;
>
>
>
> I saw that they are not present in the binary distribution as downloaded
> from website (flink-dist-0.10.0.jar). Is this intentionally? Is there also
> a binary distribution that contains these connectors?
>
>
>
> Regards,
>
>
>
> Dr. Radu Tudoran
>
> Research Engineer
>
> IT R&D Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>
>
> E-mail: *radu.tudo...@huawei.com *
>
> Mobile: +49 15209084330
>
> Telephone: +49 891588344173
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
>
>
>


RE: flink connectors

2015-11-27 Thread Radu Tudoran
Hi,

Is there any alternative to avoiding maven?
That is why I was curious if there is a binary distribution of this available 
for download directly

Dr. Radu Tudoran
Research Engineer
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Friday, November 27, 2015 2:41 PM
To: user@flink.apache.org
Subject: Re: flink connectors

Hi Radu,
the connectors are available in Maven Central.
Just add them as a dependency in your project and they will be fetched and 
included.
Best, Fabian

2015-11-27 14:38 GMT+01:00 Radu Tudoran 
mailto:radu.tudo...@huawei.com>>:
Hi,

I was trying to use flink connectors. However, when I tried to import this

import org.apache.flink.streaming.connectors.*;

I saw that they are not present in the binary distribution as downloaded from 
website (flink-dist-0.10.0.jar). Is this intentionally? Is there also a binary 
distribution that contains these connectors?

Regards,

Dr. Radu Tudoran
Research Engineer
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!




Re: flink connectors

2015-11-27 Thread Fabian Hueske
Hi Radu,

the connectors are available in Maven Central.
Just add them as a dependency in your project and they will be fetched and
included.

Best, Fabian

2015-11-27 14:38 GMT+01:00 Radu Tudoran :

> Hi,
>
>
>
> I was trying to use flink connectors. However, when I tried to import this
>
>
>
> import org.apache.flink.streaming.connectors.*;
>
>
>
> I saw that they are not present in the binary distribution as downloaded
> from website (flink-dist-0.10.0.jar). Is this intentionally? Is there also
> a binary distribution that contains these connectors?
>
>
>
> Regards,
>
>
>
> Dr. Radu Tudoran
>
> Research Engineer
>
> IT R&D Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>
>
> E-mail: *radu.tudo...@huawei.com *
>
> Mobile: +49 15209084330
>
> Telephone: +49 891588344173
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
>


flink connectors

2015-11-27 Thread Radu Tudoran
Hi,

I was trying to use flink connectors. However, when I tried to import this

import org.apache.flink.streaming.connectors.*;

I saw that they are not present in the binary distribution as downloaded from 
website (flink-dist-0.10.0.jar). Is this intentionally? Is there also a binary 
distribution that contains these connectors?

Regards,

Dr. Radu Tudoran
Research Engineer
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!



Re: Cleanup of OperatorStates?

2015-11-27 Thread Niels Basjes
Hi,

Thanks for the explanation.
I have clickstream data arriving in realtime and I need to assign the
visitId and stream it out again (with the visitId now begin part of the
record) into Kafka with the lowest possible latency.
Although the Window feature allows me to group and close the visit on a
timeout/expire (as shown to me by Aljoscha in a separate email) it does
make a 'window'.

So (as requested) I created a ticket for such a feature:
https://issues.apache.org/jira/browse/FLINK-3089

Niels






On Fri, Nov 27, 2015 at 11:51 AM, Stephan Ewen  wrote:

> Hi Niels!
>
> Currently, state is released by setting the value for the key to null. If
> you are tracking web sessions, you can try and send a "end of session"
> element that sets the value to null.
>
> To be on the safe side, you probably want state that is automatically
> purged after a while. I would look into using Windows for that. The
> triggers there are flexible so you can schedule both actions on elements
> plus cleanup after a certain time delay (clock time or event time).
>
> The question about "state expiry" has come a few times. People seem to
> like working on state directly, but it should clean up automatically.
>
> Can you see if your use case fits onto windows, otherwise open a ticket
> for state expiry?
>
> Greetings,
> Stephan
>
>
> On Thu, Nov 26, 2015 at 10:42 PM, Niels Basjes  wrote:
>
>> Hi,
>>
>> I'm working on a streaming application that ingests clickstream data.
>> In a specific part of the flow I need to retain a little bit of state per
>> visitor (i.e. keyBy(sessionid) )
>>
>> So I'm using the Key/Value state interface (i.e. OperatorState)
>> in a map function.
>>
>> Now in my application I expect to get a huge number of sessions per day.
>> Since these sessionids are 'random' and become unused after the visitor
>> leaves the website over time the system will have seen millions of those
>> sessionids.
>>
>> So I was wondering: how are these OperatorStates cleaned?
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: Working with the Windowing functionality

2015-11-27 Thread Niels Basjes
Hi,

Thanks for all this input.
I didn't know about the
  // a trigger can only have 1 timer so we remove the old trigger when
setting the new one

This insight is to me of major importance.
Let me explain:
I found in the WindowOperator this code below.

@Override

public void registerEventTimeTimer(long time) {
   if (watermarkTimer == time) {
  // we already have set a trigger for that time
  return;
   }
   Set triggers = watermarkTimers.get(time);
   if (triggers == null) {
  triggers = new HashSet<>();
  watermarkTimers.put(time, triggers);
   }
   this.watermarkTimer = time;
   triggers.add(this);
}


and

if (time == watermarkTimer) {
   watermarkTimer = -1;
   Trigger.TriggerResult firstTriggerResult =
trigger.onEventTime(time, window, this);


Effectively the new value is stored; processed yet at the moment the
trigger fires the call is not forwarded into the application.
So if I would do it as you show in your example I would have the same
number of trigger entries in the watermarkTimers set as I have seen
events.
My application will (in total) handle about 50K events/sec resulting
in to thousands 'onEventTime' calls per second.

So thank you. I now understand I have to be more careful with these timers!.

Niels Basjes




On Fri, Nov 27, 2015 at 11:28 AM, Aljoscha Krettek 
wrote:

> Hi Niels,
> do the records that arrive from Kafka already have the session ID or do
> you want to assign them inside your Flink job based on the idle timeout?
>
> For the rest of your problems you should be able to get by with what Flink
> provides:
>
> The triggering can be done using a custom Trigger that fires after we
> haven’t seen an element for 30 minutes.
> public class TimeoutTrigger implements Trigger {
>private static final long serialVersionUID = 1L;
>
>@Override
>public TriggerResult onElement(Object element, long timestamp, Window
> window, TriggerContext ctx) throws Exception {
>   // on every element it will set a timer for 30 seconds in the future
>   // a trigger can only have 1 timer so we remove the old trigger when
> setting the new one
>   ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 3);
> // this is 30 seconds but you can change it
>   return TriggerResult.CONTINUE;
>}
>
>@Override
>public TriggerResult onEventTime(long time, Window window,
> TriggerContext ctx) {
>   return TriggerResult.CONTINUE;
>}
>
>@Override
>public TriggerResult onProcessingTime(long time, Window window,
> TriggerContext ctx) throws Exception {
>   return TriggerResult.FIRE_AND_PURGE;
>}
>
>@Override
>public String toString() {
>   return "TimeoutTrigger()";
>}
> }
>
> you would use it like this:
> stream.keyBy(…).window(…).trigger(new TimeoutTrigger())
>
> For writing to files you could use the RollingSink (
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#hadoop-filesystem).
> I think this does pretty much what you want. You can specify how large the
> files that it writes are, and it can also roll to new files on a specified
> time interval.
>
> Please let us know if you need more information.
>
> Cheers,
> Aljoscha
> > On 26 Nov 2015, at 22:13, Niels Basjes  wrote:
> >
> > Hi,
> >
> > I'm trying to build something in Flink that relies heavily on the
> Windowing features.
> >
> > In essence what I want to build:
> > I have clickstream data coming in via Kafka. Each record (click) has a
> sessionid and a timestamp.
> > I want to create a window for each session and after 30 minutes idle I
> want all events for that session (visit) to be written to disk.
> > This should result in the effect that a specific visit exists in exactly
> one file.
> > Since HDFS does not like 'small files' I want to create a (set of) files
> every 15 minutes that contains several complete  visits.
> > So I need to buffer the 'completed visits' and flush them to disk in 15
> minute batches.
> >
> > What I think I need to get this is:
> > 1) A map function that assigns the visit-id (i.e. new id after 30
> minutes idle)
> > 2) A window per visit-id (close the window 30 minutes after the last
> click)
> > 3) A window per 15 minutes that only contains windows of visits that are
> complete
> >
> > Today I've been trying to get this setup and I think I have some parts
> that are in the right direction.
> >
> > I have some questions and I'm hoping you guys can help me:
> >
> > 1) I have trouble understanding the way a windowed stream works
> "exactly".
> > As a consequence I'm having a hard time verifying if my code does what I
> understand it should do.
> > I guess what would really help me is a very simple example on how to
> unittest such a window.
> >
> > 2) Is what I describe above perhaps already been done before? If so; any
> pointers are really appreciated.
> >
> > 3) Am I working in the right direction for what I'm trying to achieve;
> or should I use a different API? a different approach?

Interpretation of Trigger and Eviction on a window

2015-11-27 Thread Nirmalya Sengupta
Hello there.

I have just started exploring Apache Flink, and it has immediately got me
excited. Because I am a beginner, my questions may be a bit naive. Please
bear with me.

I refer to this particular sentence from Flink 0.10.0 Guide

:

' *After the trigger fires, and before the function (e.g., sum, count) is
applied to the window contents, an optional Evictor removes some elements
from the beginning of the window before the remaining elements are passed
on to the function* '

I am a bit confused with the assertion that elements are evicted *before*
the function is applied. Let me elaborate what my understanding is.

Let us say that my window has a 'count' trigger of 10 elements, with some
time-pane of 2 seconds (assumption: rate of ingestion is fast enough for at
least 10 elements to arrive within 2 seconds).

windowedStream.*trigger*(CountTrigger.of(10)).*evictor*(CounEvictor.of(10)).sum(_._1)
// summation of the 2nd field of a tuple

Now, till the time 9 elements have gathered in the window, the trigger is
dormant. After the 10th element  enters the window-pane, the trigger is
fired. At this point in time, all these 10 elements should be passed to the
_sum_ function so that correct summated value is generated and **only
then** the evictor is allowed to take out all the 10 elements leaving the
window-pane empty. The window's element count is set to zero and  it awaits
the arrival of the next element.

However, what the documents seems to suggest is that the evictor will be
able to take out _some_ (how many?) elements from the _beginning_ of the
window, before the _sum_ function can see the elements. Isn't this
counterintuitive or am I missing something obvious here?

Will keenly wait to hear from you.

-- Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


Re: Cleanup of OperatorStates?

2015-11-27 Thread Stephan Ewen
Hi Niels!

Currently, state is released by setting the value for the key to null. If
you are tracking web sessions, you can try and send a "end of session"
element that sets the value to null.

To be on the safe side, you probably want state that is automatically
purged after a while. I would look into using Windows for that. The
triggers there are flexible so you can schedule both actions on elements
plus cleanup after a certain time delay (clock time or event time).

The question about "state expiry" has come a few times. People seem to like
working on state directly, but it should clean up automatically.

Can you see if your use case fits onto windows, otherwise open a ticket for
state expiry?

Greetings,
Stephan


On Thu, Nov 26, 2015 at 10:42 PM, Niels Basjes  wrote:

> Hi,
>
> I'm working on a streaming application that ingests clickstream data.
> In a specific part of the flow I need to retain a little bit of state per
> visitor (i.e. keyBy(sessionid) )
>
> So I'm using the Key/Value state interface (i.e. OperatorState)
> in a map function.
>
> Now in my application I expect to get a huge number of sessions per day.
> Since these sessionids are 'random' and become unused after the visitor
> leaves the website over time the system will have seen millions of those
> sessionids.
>
> So I was wondering: how are these OperatorStates cleaned?
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: Doubt about window and count trigger

2015-11-27 Thread Aljoscha Krettek
Hi Anwar,
what Fabian wrote is completely right. I just want to give the reasoning for 
why the CountTrigger behaves as it does. The idea was to have Triggers that 
clearly focus on one thing and then at some point add combination triggers. For 
example, an OrTrigger that triggers if either of it’s sub triggers triggers, or 
an AndTrigger that triggers after both its sub triggers fire. (There is also 
more complex stuff that could be thought of here.)

Cheers,
Aljoscha
> On 27 Nov 2015, at 09:59, fhue...@gmail.com wrote:
> 
>  
> Hi,
>  
> a regular tumbling time window of 5 seconds gets all elements within that 
> period of time (semantics of time varies for processing, ingestion, and event 
> time modes) and triggers the execution after 5 seconds.
>  
> If you define a custom trigger, the assignment policy remains the same, but 
> the trigger condition is overwritten (it is NOT additional but replaces the 
> default condition), i.e., in your implementation, it will only trigger when 
> 100 elements arrived. In order to trigger also when the window time expires, 
> you have to register a timer (processing time or event time timer) via the 
> trigger context.
> NOTE: The window assigner will continue to assign elements to the window, 
> even if the window was already evaluated. If you PURGE the window and an 
> element arrives after that, a new window is created.
>  
> To implement your trigger, you have to register a timer in the onEvent() 
> method with:
> ctx.registerEventTimeTimer(window.getEnd)
> You can to that in every onEvent() call, because the timer is always 
> overwritten.
>  
> NOTE: you should use Flink’s keyed-state (access via triggerContext) if you 
> want to keep state such as the current count.
>  
> Hope this helps. Please let me know if you have further questions.
> Fabian
>  
>  
>  
> 
> From: Matthias J. Sax
> Sent: Friday, November 27, 2015 08:44
> To: user@flink.apache.org
> Subject: Re: Doubt about window and count trigger
>  
>  
> Hi,
>  
> a Trigger is an *additional* condition for intermediate (early)
> evaluation of the window. Thus, it is not "or-ed" to the basic window
> definition.
>  
> If you want to have an or-ed window condition, you can customize it by
> specifying your own window definition.
>  
> > dataStream.window(new MyOwnWindow() extends WindowAssigner { /* put your 
> > code here */ );
>  
> -Matthias
>  
>  
> On 11/26/2015 11:40 PM, Anwar Rizal wrote:
> > Hi all,
> > 
> > From the documentation:
> > "The |Trigger| specifies when the function that comes after the window
> > clause (e.g., |sum|, |count|) is evaluated (“fires”) for each window."
> > 
> > So, basically, if I specify:
> > 
> > |keyedStream
> > .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))
> > .trigger(CountTrigger.of(100))|
> > 
> > |
> > |
> > 
> > |The execution of the window function is triggered when the count reaches 
> > 100 in the time window of 5 seconds. If you have a system that never 
> > reaches 100 in 5 seconds, basically you will never have the window fired.|
> > 
> > |
> > |
> > 
> > |My question is, what would be the best option to have behavior as follow:|
> > 
> > |The execution of the window function is triggered when 5 seconds is 
> > reached or 100 events are received before 5 seconds.|
> > 
> > 
> > I think of implementing my own trigger that looks like CountTrigger, but 
> > that will fire also when the end of time window is reached (at the moment, 
> > it just returns Continue, instead of Fired). But maybe there's a better way 
> > ? 
> > 
> > Is there a reason why CountTrigger is implemented as it is implemented 
> > today, and not as I described above (5 seconds or 100 events reached, 
> > whichever comes first).
> > 
> > 
> > Thanks,
> > 
> > Anwar.
> > 



Re: graph problem to be solved

2015-11-27 Thread Stephan Ewen
Hi!

Yes, looks like quite a graph problem. The best way to get started with
that is to have a look at Gelly:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/libs/gelly_guide.html

Beware: The problem you describe (all possible paths between all pairs of
points) results in an exponential number of results, which is inherently
neither efficiently computable nor storable for anything but small graphs.


To connect Flink to Cassandra, you can simply use the Hadoop Cassandra
source/sink with Flink:

  - https://wiki.apache.org/cassandra/HadoopSupport
  -
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html#using-hadoop-inputformats

Greetings,
Stephan


On Wed, Nov 25, 2015 at 1:54 PM, RahadianBayu Permadi <
rahadian.bayu.perm...@gmail.com> wrote:

> Greetings,
>
> I am a newbie in this flink world. Thanks to Slim Baltagi for recommending
> this Flink community.
>
> I have a graph problem. So I have some points and paths among those
> points. Each path has some value like distance that determine the distance
> between two points it's connecting.
>
> So far it s quite a graph representation. I want to create a system that
> can find all possible paths (not just the shortest path) from point A to
> point B (any pair of points in the graph).
>
> My questions are:
> 1. How would flink solve this?
> 2.  Any suggestion data storage for this? Any suggestion on how to use
> Cassandra with flink?
>
> Thanks in advanced.
>
> Best Regards,
> Bayu
>


Re: Working with the Windowing functionality

2015-11-27 Thread Aljoscha Krettek
Hi Niels,
do the records that arrive from Kafka already have the session ID or do you 
want to assign them inside your Flink job based on the idle timeout?

For the rest of your problems you should be able to get by with what Flink 
provides:

The triggering can be done using a custom Trigger that fires after we haven’t 
seen an element for 30 minutes.
public class TimeoutTrigger implements Trigger {
   private static final long serialVersionUID = 1L;

   @Override
   public TriggerResult onElement(Object element, long timestamp, Window 
window, TriggerContext ctx) throws Exception {
  // on every element it will set a timer for 30 seconds in the future
  // a trigger can only have 1 timer so we remove the old trigger when 
setting the new one
  ctx.registerProcessingTimeTimer(System.currentTimeMillis() + 3); // 
this is 30 seconds but you can change it
  return TriggerResult.CONTINUE;
   }

   @Override
   public TriggerResult onEventTime(long time, Window window, TriggerContext 
ctx) {
  return TriggerResult.CONTINUE;
   }

   @Override
   public TriggerResult onProcessingTime(long time, Window window, 
TriggerContext ctx) throws Exception {
  return TriggerResult.FIRE_AND_PURGE;
   }

   @Override
   public String toString() {
  return "TimeoutTrigger()";
   }
}

you would use it like this:
stream.keyBy(…).window(…).trigger(new TimeoutTrigger())

For writing to files you could use the RollingSink 
(https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#hadoop-filesystem).
 I think this does pretty much what you want. You can specify how large the 
files that it writes are, and it can also roll to new files on a specified time 
interval.

Please let us know if you need more information.

Cheers,
Aljoscha
> On 26 Nov 2015, at 22:13, Niels Basjes  wrote:
> 
> Hi,
> 
> I'm trying to build something in Flink that relies heavily on the Windowing 
> features.
> 
> In essence what I want to build:
> I have clickstream data coming in via Kafka. Each record (click) has a 
> sessionid and a timestamp.
> I want to create a window for each session and after 30 minutes idle I want 
> all events for that session (visit) to be written to disk.
> This should result in the effect that a specific visit exists in exactly one 
> file.
> Since HDFS does not like 'small files' I want to create a (set of) files 
> every 15 minutes that contains several complete  visits.
> So I need to buffer the 'completed visits' and flush them to disk in 15 
> minute batches.
> 
> What I think I need to get this is:
> 1) A map function that assigns the visit-id (i.e. new id after 30 minutes 
> idle)
> 2) A window per visit-id (close the window 30 minutes after the last click) 
> 3) A window per 15 minutes that only contains windows of visits that are 
> complete 
> 
> Today I've been trying to get this setup and I think I have some parts that 
> are in the right direction.
> 
> I have some questions and I'm hoping you guys can help me:
> 
> 1) I have trouble understanding the way a windowed stream works "exactly". 
> As a consequence I'm having a hard time verifying if my code does what I 
> understand it should do. 
> I guess what would really help me is a very simple example on how to unittest 
> such a window.
> 
> 2) Is what I describe above perhaps already been done before? If so; any 
> pointers are really appreciated.
> 
> 3) Am I working in the right direction for what I'm trying to achieve; or 
> should I use a different API? a different approach?
> 
> Thanks
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes
> 
> 



Re: Working with State example /flink streaming

2015-11-27 Thread Aljoscha Krettek
Hi,
I’ll try to go into a bit more detail about the windows here. What you can do 
is this:

DataStream> input = … // fields are (id, sum, 
count), where count is initialized to 1, similar to word count

DataStream> counts = input
  .keyBy(0)
  .timeWindow(Time.minutes(10))
  .reduce(new MyCountingReducer())

DataStream> result = counts.map(  )

Does this help? Here, you don’t even have to deal with state, the windowing 
system will keep the state (i.e. the reduced) value in internal state in a 
fault tolerant fashion.

Cheers,
Aljoscha
> On 26 Nov 2015, at 17:14, Stephan Ewen  wrote:
> 
> Hi!
> 
> In streaming, there is no "end" of the stream when you would emit the final 
> sum. That's why there are windows.
> 
> If you do not want the partial sums, but only the final sum, you need to 
> define what window in which the sum is computed. At the end of that window, 
> that value is emitted. The window can be based on time, counts, or other 
> measures.
> 
> Greetings,
> Stephan
> 
> 
> On Thu, Nov 26, 2015 at 4:07 PM, Lopez, Javier  
> wrote:
> Hi, thanks for the answer. It worked but not in the way we expected. We 
> expect to have only one sum per ID and we are getting all the consecutive 
> sums, for example:
> 
> We expect this: (11,6) but we get this (11,1) (11,3) (11,6) (the initial 
> values are ID -> 11, values -> 1,2,3). Here is the code we are using for our 
> test:
> 
> DataStream uple2> stream = ...;
> 
> 
> DataStream> result = 
> stream.keyBy(0).map(new RollingSum());
> 
> 
> 
> public static class RollingSum extends RichMapFunction Double>, Tuple4> {
> 
> // persistent counter
>   private OperatorState sum;
>   private OperatorState count;
>   
> 
> @Override
> public Tuple4 map(Tuple2 Double> value1) {
>   try {
>   Double newSum = sum.value()+value1.f1;
>   
>   sum.update(newSum);
>   count.update(count.value()+1);
>   return new Tuple4 Double>(value1.f0,sum.value(),count.value(),sum.value()/count.value());
>   } catch (IOException e) {
>   // TODO Auto-generated catch block
>   e.printStackTrace();
>   }
> 
>   return null;
>
> }
> 
> @Override
> public void open(Configuration config) {
> sum = getRuntimeContext().getKeyValueState("mySum", Double.class, 
> 0D);
> count = getRuntimeContext().getKeyValueState("myCounter", 
> Long.class, 0L);
> }
> 
> }
> 
> 
> We are using a Tuple4 because we want to calculate the sum and the average 
> (So our Tuple is ID, SUM, Count, AVG). Do we need to add another step to get 
> a single value out of it? or is this the expected behavior.
> 
> Thanks again for your help.
> 
> On 25 November 2015 at 17:19, Stephan Ewen  wrote:
> Hi Javier!
> 
> You can solve this both using windows, or using manual state.
> 
> What is better depends a bit on when you want to have the result (the sum). 
> Do you want a result emitted after each update (or do some other operation 
> with that value) or do you want only the final sum after a certain time?
> 
> For the second variant, I would use a window, for the first variant, you 
> could use custom state as follows:
> 
> For each element, you take the current state for the key, add the value to 
> get the new sum. Then you update the state with the new sum and emit the 
> value as well...
> 
> Java:
> 
> DataStream uple2> stream = ...;
> 
> 
> DataStream> result = stream.keyBy(0).map(new 
> RollingSum());
> 
> 
> public
>  class RollingSum extends RichMapFunction, 
> Tuple2> {
> 
> 
> 
> private OperatorState sum;
> 
> 
> 
> @Override
> 
> 
> public Tuple2 map(Tuple2 value) {
> long 
> newSum = sum.value() + value.f1;
> 
> sum.update(newSum);
> 
> 
> return new Tuple2<>(value.f0, newSum);
> 
> 
> }
> 
> 
> 
> @Override
> 
> 
> public void open(Configuration config) {
> 
> 
> counter = getRuntimeContext().getKeyValueState("myCounter", Long.class, 0L);
> 
> 
> }
> }
> 
> 
> In Scala, you can write this briefly as:
> 
> val stream: DataStream[(String, Int)] = ...
> 
> 
> 
> val counts: DataStream[(String, Int)] = stream
> 
>   
> .keyBy(_._1)
> 
>   
> .mapWithState((in: (String, Int), sum: Option[Int]) 
> => {
> 
> val newSum = in._2 + sum.getOrElse(0)
> 
> ( (
> in._1, newSum), Some(newSum) )
>  }
> 
> Does that help?
> 
> Thanks also for pointing out the error in the sample code...
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Nov 25, 2015 at 4:55 PM, Lopez, Javier  
> wrote:
> Hi,
> 
> We are trying to do a test using States but we have not been able to achieve 
> our desired result. Basically we have a data stream with data as 
> [{"id":"11","value":123}] and we want to calculate the sum of al

RE: Doubt about window and count trigger

2015-11-27 Thread fhueske

Hi,

a regular tumbling time window of 5 seconds gets all elements within that 
period of time (semantics of time varies for processing, ingestion, and event 
time modes) and triggers the execution after 5 seconds.

If you define a custom trigger, the assignment policy remains the same, but the 
trigger condition is overwritten (it is NOT additional but replaces the default 
condition), i.e., in your implementation, it will only trigger when 100 
elements arrived. In order to trigger also when the window time expires, you 
have to register a timer (processing time or event time timer) via the trigger 
context.
NOTE: The window assigner will continue to assign elements to the window, even 
if the window was already evaluated. If you PURGE the window and an element 
arrives after that, a new window is created.

To implement your trigger, you have to register a timer in the onEvent() method 
with:
ctx.registerEventTimeTimer(window.getEnd)
You can to that in every onEvent() call, because the timer is always 
overwritten.

NOTE: you should use Flink’s keyed-state (access via triggerContext) if you 
want to keep state such as the current count.

Hope this helps. Please let me know if you have further questions.
Fabian




From: Matthias J. Sax
Sent: Friday, November 27, 2015 08:44
To: user@flink.apache.org
Subject: Re: Doubt about window and count trigger


Hi,

a Trigger is an *additional* condition for intermediate (early)
evaluation of the window. Thus, it is not "or-ed" to the basic window
definition.

If you want to have an or-ed window condition, you can customize it by
specifying your own window definition.

> dataStream.window(new MyOwnWindow() extends WindowAssigner { /* put your code 
> here */ );

-Matthias


On 11/26/2015 11:40 PM, Anwar Rizal wrote:
> Hi all,
> 
> From the documentation:
> "The |Trigger| specifies when the function that comes after the window
> clause (e.g., |sum|, |count|) is evaluated (“fires”) for each window."
> 
> So, basically, if I specify:
> 
> |keyedStream
> .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))
> .trigger(CountTrigger.of(100))|
> 
> |
> |
> 
> |The execution of the window function is triggered when the count reaches 100 
> in the time window of 5 seconds. If you have a system that never reaches 100 
> in 5 seconds, basically you will never have the window fired.|
> 
> |
> |
> 
> |My question is, what would be the best option to have behavior as follow:|
> 
> |The execution of the window function is triggered when 5 seconds is reached 
> or 100 events are received before 5 seconds.|
> 
> 
> I think of implementing my own trigger that looks like CountTrigger, but that 
> will fire also when the end of time window is reached (at the moment, it just 
> returns Continue, instead of Fired). But maybe there's a better way ? 
> 
> Is there a reason why CountTrigger is implemented as it is implemented today, 
> and not as I described above (5 seconds or 100 events reached, whichever 
> comes first).
> 
> 
> Thanks,
> 
> Anwar.
>