How to use 'dynamic' state

2017-03-06 Thread Steve Jerman
I’ve been reading the code/user goup/SO and haven’t really found a great answer 
to this… so I thought I’d ask.

I have a UI that allows the user to edit rules which include specific criteria 
for example trigger event if X many people present for over a minute.

I would like to have a flink job that processes an event stream and triggers on 
these rules.

The catch is that I don’t want to have to restart the job if the rules change… 
(it would be easy otherwise :))

So I found four ways to proceed:

* API based stop and restart of job … ugly.

* Use a co-map function with the rules alone stream and the events as the 
other. This seems better however, I would like to have several ‘trigger’ 
functions changed together .. e.g. a tumbling window for one type of criteria 
and a flat map for a different sort … So I’m not sure how to hook this up for 
more than a simple co-map/flatmap. I did see this suggested in one answer and 

* Use broadcast state : this seems reasonable however I couldn’t tell if the 
broadcast state would be available to all of the processing functions. Is it 
generally available?

* Implement my own operators… seems complicated ;)

Are there other approaches?

Thanks for any advice
Steve

Re: How to use 'dynamic' state

2017-03-07 Thread Steve Jerman
Thanks for your reply. It makes things much clearer for me.  I think you are 
right - Side Inputs are probably the right way long term (I  looked at the Team 
definition), but I think I can construct something in the mean time.

Steve

On Mar 7, 2017, at 6:11 AM, Aljoscha Krettek 
mailto:aljos...@apache.org>> wrote:

Hi Steve,
I think Gordon already gave a pretty good answer, I'll just try and go into the 
specifics a bit.

You mentioned that there would be different kinds of operators required for the 
rules (you hinted at FlatMap and maybe a tumbling window). Do you know each of 
those types before starting your program? If yes, you could have several of 
these "primitive" operations in your pipeline and each of them only listens to 
rule changes (on a second input) that is relevant to their operation.

Side inputs would be very good for this but I think you can also get the same 
level of functionality by using a CoFlatMap (for the window case you would use 
a CoFlatMap chained to a window operation).

Does that help? I'm sure we can figure something out together.

Best,
Aljoscha


On Tue, Mar 7, 2017, at 07:44, Tzu-Li (Gordon) Tai wrote:
Hi Steve,

I’ll try to provide some input for the approaches you’re currently looking into 
(I’ll comment on your email below):

* API based stop and restart of job … ugly.

Yes, indeed ;) I think this is absolutely not necessary.

* Use a co-map function with the rules alone stream and the events as the 
other. This seems better however, I would like to have several ‘trigger’ 
functions changed together .. e.g. a tumbling window for one type of criteria 
and a flat map for a different sort … So I’m not sure how to hook this up for 
more than a simple co-map/flatmap. I did see this suggested in one answer and

Do you mean that operators listen only to certain rules / criteria settings 
changes? You could either have separate stream sources for each kind of 
criteria rule trigger events, or have a single source and split them 
afterwards. Then, you broadcast each of them with the corresponding co-map / 
flat-maps.

* Use broadcast state : this seems reasonable however I couldn’t tell if the 
broadcast state would be available to all of the processing functions. Is it 
generally available?

From the context of your description, I think what you want is that the 
injected rules stream can be seen by all operators (instead of “broadcast 
state”, which in Flink streaming refers to something else).

Aljoscha recently consolidated a FLIP for Side Inputs [1], which I think is 
targeted exactly for what you have in mind here. Perhaps you can take a look at 
that and see if it makes sense for your use case? But of course, this isn’t yet 
available as it is still under discussion. I think Side Inputs may be an ideal 
solution for what you have in mind here, as the rule triggers I assume should 
be slowly changing.

I’ve CCed Aljoscha so that he can probably provide more insights, as he has 
worked a lot on the stuff mentioned here.

Cheers,
Gordon

[1] FLIP-17: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API




On March 7, 2017 at 5:05:04 AM, Steve Jerman 
(st...@kloudspot.com<mailto:st...@kloudspot.com>) wrote:


I’ve been reading the code/user goup/SO and haven’t really found a great answer 
to this… so I thought I’d ask.

I have a UI that allows the user to edit rules which include specific criteria 
for example trigger event if X many people present for over a minute.

I would like to have a flink job that processes an event stream and triggers on 
these rules.

The catch is that I don’t want to have to restart the job if the rules change… 
(it would be easy otherwise :))

So I found four ways to proceed:

* API based stop and restart of job … ugly.

* Use a co-map function with the rules alone stream and the events as the 
other. This seems better however, I would like to have several ‘trigger’ 
functions changed together .. e.g. a tumbling window for one type of criteria 
and a flat map for a different sort … So I’m not sure how to hook this up for 
more than a simple co-map/flatmap. I did see this suggested in one answer and

* Use broadcast state : this seems reasonable however I couldn’t tell if the 
broadcast state would be available to all of the processing functions. Is it 
generally available?

* Implement my own operators… seems complicated ;)

Are there other approaches?

Thanks for any advice
Steve





Question Regarding a sink..

2017-03-23 Thread Steve Jerman
Hi,

I have a sink writing data to InfluxDB. I’ve noticed that the sink gets 
multiple copies of upstream records..

Why does this happen, and how can I avoid it… ?

Below is a trace …showing 2 records (I have a parallelism of two) for each 
record in the ‘.printToError’ for the same stream.

Any help/suggestions appreciated.

Steve


1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=28:24:0B:B6:42:CA, sessionTime=5000}]
1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=8F:13:AC:4A:DA:93, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=43:7D:8A:D4:7D:D7, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=77:CD:BD:48:EE:D8, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=28:24:0B:B6:42:CA, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=28:24:0B:B6:42:CA, sessionTime=5000}]
18:41:27,352 INFO  com.kloudspot.flink.sink.InfluxSink  
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins

Re: In-Memory data grid

2017-04-03 Thread Steve Jerman
I had a DB look up to do and  used com.google.common.cache.Cache with a 10 sec 
timeout:

private static Cache locationCache = 
CacheBuilder.newBuilder().maximumSize(1000).expireAfterAccess(10, 
TimeUnit.SECONDS).build();

Seemed to help a lot with throughput….

Steve

On Apr 3, 2017, at 10:55 AM, nragon 
mailto:nuno.goncal...@wedotechnologies.com>>
 wrote:

Just to add a scenario.

My current arquitecture is the following:
I've deployed 4 ignite node in yarn and 5 task managers with 2G and 2 slots
each.
As cache on ignite I have on record in key/value (string, object[])
My thoughput without ignite is 30k/sec when I add the lookup i get 3k/sec

My question is, has anyone used ignite as fast lookup or any other in memery
data grid? If so, did you have any major performance impact?

Thanks



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/In-Memory-data-grid-tp12494p12510.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.



Using FlinkML from Java?

2017-04-20 Thread Steve Jerman
Hi Folks,

I’m trying to use FlinkML 1.2 from Java … getting this:

SVM svm = new SVM()
  .setBlocks(env.getParallelism())
  .setIterations(100)
  .setRegularization(0.001)
  .setStepsize(0.1)
  .setSeed(42);


svm.fit(labelledTraining);

The type org.apache.flink.api.scala.DataSet cannot be resolved. It is 
indirectly referenced from required .class files.

Are there any tricks required to get it running? Or is Java not supported?

Steve


Checkpoints?

2017-06-02 Thread Steve Jerman
Hi,

Configuration:
Flink 1.2.0
I'm using the Rocks DB backend for checkpointing.

The problem I have is that no checkpoints are being deleted, and my disk is 
filling up.

Is there configuration for this?

Thanks
Steve

Re: Checkpoints?

2017-06-02 Thread Steve Jerman
Thanks. Upgraded to 1.2.1 - problem goes away

Steve

On Jun 2, 2017, at 10:08 AM, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

Hi Steve,

in the past we had some problems with cleaning up old checkpoints. But this was 
in 1.1.x. These problems should be fixed by now.

Could you try upgrading to Flink 1.2.1 in order to see whether the problem 
persists? If this is the case, then it would be great if you could share the 
JobManager logs on debug log level with us.

How long is your checkpoint interval? Deleting files from HDFS/S3 can take some 
time and if the checkpoint interval is shorter than this time, then the system 
won't be able to delete old checkpoints quick enough.

Cheers,
Till

On Fri, Jun 2, 2017 at 5:57 PM, Steve Jerman 
mailto:st...@kloudspot.com>> wrote:
Hi,

Configuration:
Flink 1.2.0
I'm using the Rocks DB backend for checkpointing.

The problem I have is that no checkpoints are being deleted, and my disk is 
filling up.

Is there configuration for this?

Thanks
Steve




Quick Question...

2017-06-22 Thread Steve Jerman
Hi,

I have a quick question…

How do I set the Configuration passed into RichFunction.open?

I *thought* that setting GlobalJobParameters would do it ...

env.getConfig().setGlobalJobParameters(jobParameters);

But it seems not…

Steve



Re: Quick Question...

2017-06-22 Thread Steve Jerman
Thx for the quick answer

Get Outlook for iOS<https://aka.ms/o0ukef>

From: Chesnay Schepler 
Sent: Thursday, June 22, 2017 12:13:23 PM
To: user@flink.apache.org
Subject: Re: Quick Question...

Hello,

in the DataSet API you can do this when specifying your transformations, 
something
along the lines of dataset.map(..).withConfiguration.

In the DataStream API you cannot set the Configuration at all.

Note that in both APIs you can also just pass the Configuration into the 
constructor and
store it in a field.

Regards,
Chesnay

On 22.06.2017 19:56, Steve Jerman wrote:
Hi,

I have a quick question…

How do I set the Configuration passed into RichFunction.open?

I *thought* that setting GlobalJobParameters would do it ...

env.getConfig().setGlobalJobParameters(jobParameters);

But it seems not…

Steve




Advice on debugging state back end...

2017-08-08 Thread Steve Jerman
Hi Folks,


I have a stream application which is running out of heap space - looks like 
there might be something up with state storage I'm having trouble 
determining if it just needs a lot of memory or there is a memory leak.


Are there any tips/best practice for this? I've looked at heap dumps and I just 
see lot of tuples ... it's not clear which task/operator they belong to...


I'm looking at moving to use RockDB (assuming it's just memory ...) but I'd 
like to determine if there is actually a leak. It doesn't seem to have issues 
with smaller input rates.


For reference, I'm using 1.2.1 and the source is a 4 partition Kafka topic with 
some windowing etc after that...


Thanks for any pointers/advice.

Steve


Question about Global Windows.

2017-08-13 Thread Steve Jerman
Hi Folks,

I have a question regarding Global Windows.

I have a stream with a large number of records. The records have a key which 
has a very high cardinality. They also have a state ( start, status, finish).

I need to do some processing where I look at the records separated into windows 
using the ‘state’ property.

From the documentation, I believe I should be using a Global Window with a 
custom trigger to identify the windows….

I have this implemented.. the Trigger returns ‘CONTINUE” for ‘start', and 
FIRE_AND_PURGE for ‘finish'.

I also need to avoid running out of memory  since sometimes I don’t get 
‘finish’ records… so I added a timer to the Trigger which PURGE’s if it fires..

Is this the correct approach?

I say this since I do in fact see a memory leak …  is there anything else I 
need to be aware of?

Thanks

Steve


Re: Question about Global Windows.

2017-08-17 Thread Steve Jerman
Thank you Nico.


I *think* I should have one stream per key... the stream I get is pretty fast 
and there may be some corner cases I'm not aware of. However, I really need to 
process as a single window per key.


I am worried about the cardinality of the key ...  I wanted to use a timeout to 
remove the window for a key. If not the memory requirements would grow quickly 
(which I think is what is happening). The stream has 60K unique keys per 5 
minutes window (maybe 1/2 million total unique per day...).


Anyway I'll write a test to investigate further...


Thanks for your thoughts


Steve






From: Nico Kruber 
Sent: Wednesday, August 16, 2017 3:22:41 AM
To: user@flink.apache.org
Cc: Steve Jerman
Subject: Re: Question about Global Windows.

Hi Steve,
are you sure a GlobalWindows assigner fits your needs? This may be the case if
all your events always come in order and you do not ever have overlapping
sessions since a GlobalWindows assigner simply puts all events (per key) into
a single window (per key). If you have overlapping sessions, you may need your
own window assigner that handles multiple windows (see the
EventTimeSessionWindows assigner for our take on event-time session windows).

Regarding the timer: if you set it via `#registerEventTimeTimer()`, it only
fires if a watermark passes the given timestamp, so you need to make sure your
sources create them (see [1] and its sub-topics). Depending on your further
constraints in your application, it may be ok to use
`registerProcessingTimeTimer()` instead.

Does this help already? If not, we'd need some (minimal) example of how your
using these things to debug further into your memory issues.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
Apache Flink 1.3 Documentation: Application 
Development<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/>
ci.apache.org
Application Development; Application Development



event_time.html

On Monday, 14 August 2017 06:32:01 CEST Steve Jerman wrote:
> Hi Folks,
>
> I have a question regarding Global Windows.
>
> I have a stream with a large number of records. The records have a key which
> has a very high cardinality. They also have a state ( start, status,
> finish).

> I need to do some processing where I look at the records separated into
> windows using the ‘state’ property.

> From the documentation, I believe I should be using a Global Window with a
> custom trigger to identify the windows….

> I have this implemented.. the Trigger returns ‘CONTINUE” for ‘start', and
> FIRE_AND_PURGE for ‘finish'.

> I also need to avoid running out of memory  since sometimes I don’t get
> ‘finish’ records… so I added a timer to the Trigger which PURGE’s if it
> fires..

> Is this the correct approach?
>
> I say this since I do in fact see a memory leak …  is there anything else I
> need to be aware of?

> Thanks
>
> Steve



Re: Question about Global Windows.

2017-08-17 Thread Steve Jerman
😊


https://issues.apache.org/jira/browse/FLINK-7473


From: Steve Jerman 
Sent: Thursday, August 17, 2017 11:34:09 AM
To: Nico Kruber; user@flink.apache.org
Subject: Re: Question about Global Windows.


Thank you Nico.


I *think* I should have one stream per key... the stream I get is pretty fast 
and there may be some corner cases I'm not aware of. However, I really need to 
process as a single window per key.


I am worried about the cardinality of the key ...  I wanted to use a timeout to 
remove the window for a key. If not the memory requirements would grow quickly 
(which I think is what is happening). The stream has 60K unique keys per 5 
minutes window (maybe 1/2 million total unique per day...).


Anyway I'll write a test to investigate further...


Thanks for your thoughts


Steve






From: Nico Kruber 
Sent: Wednesday, August 16, 2017 3:22:41 AM
To: user@flink.apache.org
Cc: Steve Jerman
Subject: Re: Question about Global Windows.

Hi Steve,
are you sure a GlobalWindows assigner fits your needs? This may be the case if
all your events always come in order and you do not ever have overlapping
sessions since a GlobalWindows assigner simply puts all events (per key) into
a single window (per key). If you have overlapping sessions, you may need your
own window assigner that handles multiple windows (see the
EventTimeSessionWindows assigner for our take on event-time session windows).

Regarding the timer: if you set it via `#registerEventTimeTimer()`, it only
fires if a watermark passes the given timestamp, so you need to make sure your
sources create them (see [1] and its sub-topics). Depending on your further
constraints in your application, it may be ok to use
`registerProcessingTimeTimer()` instead.

Does this help already? If not, we'd need some (minimal) example of how your
using these things to debug further into your memory issues.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
Apache Flink 1.3 Documentation: Application 
Development<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/>
ci.apache.org
Application Development; Application Development



event_time.html

On Monday, 14 August 2017 06:32:01 CEST Steve Jerman wrote:
> Hi Folks,
>
> I have a question regarding Global Windows.
>
> I have a stream with a large number of records. The records have a key which
> has a very high cardinality. They also have a state ( start, status,
> finish).

> I need to do some processing where I look at the records separated into
> windows using the ‘state’ property.

> From the documentation, I believe I should be using a Global Window with a
> custom trigger to identify the windows….

> I have this implemented.. the Trigger returns ‘CONTINUE” for ‘start', and
> FIRE_AND_PURGE for ‘finish'.

> I also need to avoid running out of memory  since sometimes I don’t get
> ‘finish’ records… so I added a timer to the Trigger which PURGE’s if it
> fires..

> Is this the correct approach?
>
> I say this since I do in fact see a memory leak …  is there anything else I
> need to be aware of?

> Thanks
>
> Steve



Question about configuring Rich Functions

2017-10-13 Thread Steve Jerman
This document:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#parsing-command-line-arguments-and-passing-them-around-in-your-flink-application

Apache Flink 1.3 Documentation: Best 
Practices
ci.apache.org
Application Development; Best Practices; Best Practices. This page contains a 
collection of best practices for Flink programmers on how to solve frequently 
...


describes the use of 'withParameters' to pass configuration into RichFunctions. 
Is this supported for streams?


The .withParameters method doesn't seem to exist on stream operations ... map, 
window,filter 


Is this dataset only? If so, might be worth clarifying in docs. If so, is there 
anyway to pass config via the open method in streams... seems not.


Steve