Appending Windowed Aggregates to Events

2017-06-23 Thread Tim Stearn
Hello All,

I'm *very* new to Flink.  I read through the documentation and played with some 
sample code, but I'm struggling to get started with my requirements.


We want to use Flink to maintain windowed aggregates as part of a transaction 
monitoring application.  These would use sliding window definitions.  An 
example would be:  "Total amount for CASH transactions in the last 5 days".   
Here's what I need my Flink application to do:

1.  Prepare for transaction processing by reading historical aggregates and 
building windows

2.  For each new transaction:

a.  Update the windowed aggregate with the new transaction data

b.  Find the window that matches the incoming time stamp and add the 
aggregate value to the transaction

c.  Send enhanced transaction (original fields + aggregates from matching 
window) to downstream processor via RabbitMQ or Kafka sink

For every transaction coming in, I want one (and only one) output that contains 
the original transaction fields plus the aggregates.

I see how to do the code to create the window assigner and the code that 
incrementally maintains the aggregates.  I'm not sure how I could join this 
back to the original transaction record, appending the aggregate values from 
the window that matches the transaction date stamp.  This seems like a join of 
some kind to me, but I don't know how to implement in in Flink.

I'm hoping someone could reply with some simple code (or even pseudo code) to 
get me started on the "join"  part of the above data flow.  Please let me know 
if I need to clarify.

Thanks,

Tim Stearn



Re: Looking for Contributors: Apache Flink meets Apache Mesos and DC/OS

2017-06-23 Thread Till Rohrmann
A quick addition concerning the time. The office hour will take place on
the 29th of June at 10am PST.

Cheers,
Till

On Wed, Jun 21, 2017 at 10:16 PM, Till Rohrmann 
wrote:

> Hi,
>
> we are actively looking for contributors (and anyone interested) for the
> Flink DC/OS package, as well as the Mesos Flink framework.
>
> We will have a first meeting to explain the current package and
> outline/develop a potential roadmap in the next virtual DC/OS office
> hours: https://www.meetup.com/DC-OS-Online-Meetup/events/240919616/
>
> If you cannot make it to this virtual event, feel free to reach out via
> the #flink channel in the  DC/OS community slack (chat.dcos.io).
>
> Looking forwards to many cool features around Flink and Mesos/ DC/OS,
> Jörg and Till
>


Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Fabian Hueske
If the data does not have a key (or you do not care about it) you can also
use a FlatMapFunction (or ProcessFunction) with Operator State. Operator
State is not bound to a key but to a parallel operator instance. Have a
look at the ListCheckpointed interface and its JavaDocs.

2017-06-23 18:27 GMT+02:00 Edward :

> So there is no way to do a countWindow(100) and preserve data locality?
>
> My use case is this: augment a data stream with new fields from DynamoDB
> lookup. DynamoDB allows batch get's of up to 100 records, so I am trying to
> collect 100 records before making that call. I have no other reason to do a
> repartitioning, so I am hoping to avoid incurring the cost of shipping all
> the data across the network to do this.
>
> If I use countWindowAll, I am limited to parallelism = 1, so all data gets
> repartitioned twice. And if I use keyBy().countWindow(), then it gets
> repartitioned by key. So in both cases I lose locality.
>
> Am I missing any other options?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-
> countWindow-tp7482p13981.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


A way to purge an empty session

2017-06-23 Thread Gwenhael Pasquiers
Hello,

This may be premature optimization for memory usage but here is my question :

I have to do an app that will have to monitor sessions of (millions of) users. 
I don’t know when the session starts nor ends, nor a reasonable maximum 
duration.

I want to have a maximum duration (timeout) of 24H.

However I’d like to be able to PURGE sessions that ended as soon as possible to 
free memory.

I use a Trigger to trig my WindowFunction for each input EVENT. It will (when 
relevant) output a SESSION (with start and end timestamps). I use the Evictor 
in order to remove the EVENT used to build the SESSION  (they have a Boolean 
field “removable” set to true from the WindowFunction so that the Evictor knows 
it can remove them in the evictAfter method)… That way at least I can clean the 
content of the windows.
However From what I’m seeing it looks like the window instance will still stay 
alive (even if empty) until it reaches its maximum duration (24 hours) even if 
the session it represents lasted 2 minutes: at the end of the day I might have 
millions of sessions in memory when in reality only thousands are really alive 
at a given time. That might also really slow down the backups and restores of 
the application if it needs to store millions of empty windows.

I’m aware that my need looks like the session windows. But the session window 
works mainly by merging windows that overlap within a session gap. My session 
gap can be multiple hours long, so I’m afraid that it would not help me…

So my question is : is there a way to inform the “Trigger” that the windows has 
no more elements and that it can be PURGED. Or a way for a WindowFunction to 
“kill” the window it’s being applied on ? Of course my window might be 
re-created if new events arrive later for the same key.

My other option is to simply use a flatmap operator that will hold an HashMap 
of sessions, that way I might be able to clean it up when I close my sessions, 
but I think it would be prettier to rely on Flink’s Windows ;-)

Thanks in advance,


Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Edward
So there is no way to do a countWindow(100) and preserve data locality?

My use case is this: augment a data stream with new fields from DynamoDB
lookup. DynamoDB allows batch get's of up to 100 records, so I am trying to
collect 100 records before making that call. I have no other reason to do a
repartitioning, so I am hoping to avoid incurring the cost of shipping all
the data across the network to do this. 

If I use countWindowAll, I am limited to parallelism = 1, so all data gets
repartitioned twice. And if I use keyBy().countWindow(), then it gets
repartitioned by key. So in both cases I lose locality.

Am I missing any other options?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13981.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Fabian Hueske
No, you will lose data locality if you use keyBy(), which is the only way
to obtain a KeyedStream.

2017-06-23 17:52 GMT+02:00 Edward :

> Thanks, Fabian.
> In this case, I could just extend your idea by creating some deterministic
> multiplier of the subtask index:
>
>   RichMapFunction> keyByMap = new
> RichMapFunction>() {
>   public Tuple2 map(String value) {
> int indexOfCounter = Math.abs(value.hashCode()) % 4;
> int key = (( getRuntimeContext().getIndexOfThisSubtask() +
> 1)  * (indexOfCounter + 1)) - 1;
> counters.get(key).add(1);
> return new Tuple2<>(key, value);
> }
> };
>
> With this idea, if there are 12 subtasks, then subtask 0 would create 4
> keys: 0, 12, 24, and 36.
>
> The big advantage of your idea was that it would keep the data local. Is
> this still true with my example here (where I'm applying a function to the
> subtask index)? That is, if a each partition is generating a unique set of
> keys (unique to that subtask), will it optimize to keep that set of keys
> local for the next downstream subtask?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-
> countWindow-tp7482p13978.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


[ANNOUNCE] Apache Flink 1.3.1 released

2017-06-23 Thread Robert Metzger
The Apache Flink community is pleased to announce the release of Apache
Flink 1.3.1.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.


The release is available for download at:

https://flink.apache.org/downloads.html

There is a blog post available on the Flink site listing all the addressed
issues:

   *http://flink.apache.org/news/2017/06/23/release-1.3.1.html
*



We would like to thank all contributors who made this release possible!


Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Edward
Thanks, Fabian.
In this case, I could just extend your idea by creating some deterministic
multiplier of the subtask index:

  RichMapFunction> keyByMap = new
RichMapFunction>() {
  public Tuple2 map(String value) {
int indexOfCounter = Math.abs(value.hashCode()) % 4;
int key = (( getRuntimeContext().getIndexOfThisSubtask() +
1)  * (indexOfCounter + 1)) - 1;
counters.get(key).add(1);
return new Tuple2<>(key, value);
}
};

With this idea, if there are 12 subtasks, then subtask 0 would create 4
keys: 0, 12, 24, and 36.

The big advantage of your idea was that it would keep the data local. Is
this still true with my example here (where I'm applying a function to the
subtask index)? That is, if a each partition is generating a unique set of
keys (unique to that subtask), will it optimize to keep that set of keys
local for the next downstream subtask?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13978.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Fabian Hueske
Flink hashes the keys and computes the target partition using modulo. This
works well, if you have many keys but leads to skew if the number of keys
is close to the number of partitions.
If you use parittionCustom, you can explicitly define the target partition,
however, partitionCustom does not return a KeyedStream, so you cannot use
keyed state or windows there.

Not sure if that works for your usecase, but you could try to use more keys
to achieve a more uniform key distribution.

Best, Fabian

2017-06-23 15:34 GMT+02:00 Edward :

> Hi Fabian -
> I've tried this idea of creating a KeyedStream based on
> getRuntimeContext().getIndexOfThisSubtask(). However, not all target
> subtasks are receiving records.
>
> All subtasks have a parallelism of 12, so I have 12 source subtasks and 12
> target subtasks. I've confirmed that the call to getIndexOfThisSubtask is
> evenly distributed between 0 and 11. However, 4 out of the 12 target
> subtasks (the subtasks after the hash) are no receiving any data. This
> means
> it's not actually keeping all the data local, because at least 4 of the 12
> partitions could be getting sent to different TaskManagers.
>
> Do I need to do a .partitionCustom to ensure even/local distribution?
>
> Thanks,
> Edward
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-
> countWindow-tp7482p13971.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Integrating Flink CEP with a Rules Engine

2017-06-23 Thread Suneel Marthi
Sorry I didn't read the whole thread.

We have a similar rqmt wherein the users would like to add/update/delete
CEP patterns via UX or REST api and we started discussing building a REST
api for that, glad to see that this is a common ask and if there's already
a community effort around this - that's great to know.

On Fri, Jun 23, 2017 at 9:54 AM, Sridhar Chellappa 
wrote:

> Folks,
>
> Plenty of very good points but I see this discussion digressing from what
> I originally asked for. We need a dashboard to let the Business Analysts to
> define rules and the CEP to run them.
>
> My original question was how to solve this with Flink CEP?
>
> From what I see, this is not a solved problem. Correct me if I am wrong.
>
> On Fri, Jun 23, 2017 at 6:52 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi all,
>>
>> Currently there is an ongoing effort to integrate FlinkCEP with Flink's
>> SQL API.
>> There is already an open FLIP for this:
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-20%
>> 3A+Integration+of+SQL+and+CEP
>> 
>>
>> So, if there was an effort for integration of different
>> libraries/tools/functionality as well, it
>> would be nice to go a bit more into details on i) what is already there,
>> ii) what is planned to be
>> integrated for the SQL effort, and iii) what else is required, and
>> consolidate the resources
>> available.
>>
>> This will allow the community to move faster and with a clear roadmap.
>>
>> Kostas
>>
>> On Jun 23, 2017, at 2:51 PM, Suneel Marthi  wrote:
>>
>> FWIW, here's an old Cloudera blog about using Drools with Spark.
>>
>> https://blog.cloudera.com/blog/2015/11/how-to-build-a-comple
>> x-event-processing-app-on-apache-spark-and-drools/
>>
>> It should be possible to invoke Drools from Flink in a similar way (I
>> have not tried it).
>>
>> It all depends on what the use case and how much of present Flink CEP
>> satisfies the use case before considering integration with more complex
>> rule engines.
>>
>>
>> Disclaimer: I work for Red Hat
>>
>> On Fri, Jun 23, 2017 at 8:43 AM, Ismaël Mejía  wrote:
>>
>>> Hello,
>>>
>>> It is really interesting to see this discussion because that was one
>>> of the questions on the presentation on CEP at Berlin Buzzwords, and
>>> this is one line of work that may eventually make sense to explore.
>>>
>>> Rule engines like drools implement the Rete algorithm that if I
>>> understood correctly optimizes the analysis of a relatively big set of
>>> facts (conditions) into a simpler evaluation graph. For more details
>>> this is a really nice explanation.
>>> https://www.sparklinglogic.com/rete-algorithm-demystified-part-2/
>>>
>>> On flink's CEP I have the impression that you define this graph by
>>> hand. Using a rule engine you could infer an optimal graph from the
>>> set of rules, and then this graph could be translated into CEP
>>> patterns.
>>>
>>> Of course take all of this with a grain of salt because I am not an
>>> expert on both CEP or the Rete algorithm, but I start to see the
>>> connection of both worlds more clearly now. So if anyone else has
>>> ideas of the feasibility of this or can see some other
>>> issues/consequences please comment. I also have the impression that
>>> distribution is less of an issue because the rete network is
>>> calculated only once and updates are not 'dynamic' (but I might be
>>> wrong).
>>>
>>> Ismaël
>>>
>>> ps. I add Thomas in copy who was who made the question in the
>>> conference in case he has some comments/ideas.
>>>
>>>
>>> On Fri, Jun 23, 2017 at 1:48 PM, Kostas Kloudas
>>>  wrote:
>>> > Hi Jorn and Sridhar,
>>> >
>>> > It would be worth describing a bit more what these tools are and what
>>> are
>>> > your needs.
>>> > In addition, and to see what the CEP library already offers here you
>>> can
>>> > find the documentation:
>>> >
>>> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/libs/cep.html
>>> >
>>> >
>>> > Thanks,
>>> > Kostas
>>> >
>>> > On Jun 23, 2017, at 1:41 PM, Jörn Franke  wrote:
>>> >
>>> > Hallo,
>>> >
>>> > It si possible, but some caveat : flink is a distributed system, but in
>>> > drools the fact are only locally available. This may lead to strange
>>> effects
>>> > when rules update the fact base.
>>> >
>>> > Best regards
>>> >
>>> > On 23. Jun 2017, at 12:49, Sridhar Chellappa 
>>> wrote:
>>> >
>>> > Folks,
>>> >
>>> > I am new to Flink.
>>> >
>>> > One of the reasons why I am interested in Flink is because of its CEP
>>> > library. Our CEP logic comprises of a set of complex business rules
>>> which
>>> > will have to be managed (Create, Update, Delete) by a bunch of business
>>> > analysts.
>>> >
>>> > Is there a way I can integrate other third party tools (Drools,
>>> OpenRules)
>>> > to let Business Analysts define rules and  execute them using Flink's
>>> CEP
>>> > library?
>>> >
>>> >
>>>
>>
>>
>>
>


Gelly - bipartite graph runs vertex-centric

2017-06-23 Thread Kaepke, Marc
Hi,

does Gelly provides a vertex-centric iteration on a bipartite graph?

A bipartite graph is using BipartiteEdges and vertex-centric supports regular 
edges only.


Thanks!

Best,
Marc

Re: Recursive Traversal of the Input Path Directory, Not working

2017-06-23 Thread Stefan Richter
Hi,

I suggest that you simply open an issue for this in our jira, describing the 
improvement idea. That should be the fastest way to get this changed.

Best,
Stefan

> Am 23.06.2017 um 15:08 schrieb Adarsh Jain :
> 
> Hi Stefan,
> 
> I think I found the problem, try it with a file which starts with underscore 
> in the name like "_part-1-0.csv".
> 
> While saving Flink appends a "_" to the file name however while reading at 
> folder level it does not pick those files.
> 
> Can you suggest if we can do a setting so that it does not pre appends 
> underscore while saving a file.
> 
> Regards,
> Adarsh
> 
> On Fri, Jun 23, 2017 at 3:24 PM, Stefan Richter  > wrote:
> No, that doesn’t make a difference and also works.
> 
>> Am 23.06.2017 um 11:40 schrieb Adarsh Jain > >:
>> 
>> I am using "val env = ExecutionEnvironment.getExecutionEnvironment", can 
>> this be the problem?
>> 
>> With "import org.apache.flink.api.scala.ExecutionEnvironment"
>> 
>> Using scala in my program.
>> 
>> Regards,
>> Adarsh 
>> 
>> On Fri, Jun 23, 2017 at 3:01 PM, Stefan Richter > > wrote:
>> I just copy pasted your code, adding the missing "val env = 
>> LocalEnvironment.createLocalEnvironment()" and exchanged the string with a 
>> local directory for some test files that I created. No other changes.
>> 
>>> Am 23.06.2017 um 11:25 schrieb Adarsh Jain >> >:
>>> 
>>> Hi Stefan,
>>> 
>>> Thanks for your efforts in checking the same, still doesn't work for me. 
>>> 
>>> Can you copy paste the code you used maybe I am doing some silly mistake 
>>> and am not able to figure out the same.
>>> 
>>> Thanks again.
>>> 
>>> Regards,
>>> Adarsh
>>> 
>>> 
>>> On Fri, Jun 23, 2017 at 2:32 PM, Stefan Richter 
>>> mailto:s.rich...@data-artisans.com>> wrote:
>>> Hi,
>>> 
>>> I tried this out on the current master and the 1.3 release and both work 
>>> for me everything works exactly as expected, for file names, a directory, 
>>> and even nested directories.
>>> 
>>> Best,
>>> Stefan
>>> 
 Am 22.06.2017 um 21:13 schrieb Adarsh Jain >>> >:
 
 Hi Stefan,
 
 Yes your understood right, when I give full path till the filename it 
 works fine however when I give path till 
 directory it does not read the data, doesn't print any exceptions too ... 
 I am also not sure why it is behaving like this.
 
 Should be easily replicable, in case you can try. Will be really helpful.
 
 Regards,
 Adarsh
 
 On Thu, Jun 22, 2017 at 9:00 PM, Stefan Richter 
 mailto:s.rich...@data-artisans.com>> wrote:
 Hi,
 
 I am not sure I am getting the problem right: the code works if you use a 
 file name, but it does not work for directories? What exactly is not 
 working? Do you get any exceptions?
 
 Best,
 Stefan
 
> Am 22.06.2017 um 17:01 schrieb Adarsh Jain  >:
> 
> Hi,
> 
> I am trying to use "Recursive Traversal of the Input Path Directory" in 
> Flink 1.3 using scala. Snippet of my code below. If I give exact file 
> name it is working fine. Ref 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html
>  
> 
> 
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
> import org.apache.flink.configuration.Configuration
> 
> val config = new Configuration
> config.setBoolean("recursive.file.enumeration",true)
> 
> val featuresSource: String = 
> "file:///Users/adarsh/Documents/testData/featurecsv/31c710ac40/2017/06/22 
> <>"
> 
> val testInput = env.readTextFile(featuresSource).withParameters(config)
> testInput.print()
> 
> Please guide how to fix this.
> 
> Regards,
> Adarsh
> 
 
 
>>> 
>>> 
>> 
>> 
> 
> 



Re: Integrating Flink CEP with a Rules Engine

2017-06-23 Thread Kostas Kloudas
The rules, or patterns supported by FlinkCEP are presented in the documentation 
link
I posted earlier.

Dynamically updating these patterns, is not supported yet, but there are 
discussions to 
add this feature soon.

If the rules you need are supported by the current version of FlinkCEP, then 
you can 
start right away. If not, you need to provide more details.

Kostas

> On Jun 23, 2017, at 3:54 PM, Sridhar Chellappa  wrote:
> 
> CEP



Re: Integrating Flink CEP with a Rules Engine

2017-06-23 Thread Sridhar Chellappa
Folks,

Plenty of very good points but I see this discussion digressing from what I
originally asked for. We need a dashboard to let the Business Analysts to
define rules and the CEP to run them.

My original question was how to solve this with Flink CEP?

>From what I see, this is not a solved problem. Correct me if I am wrong.

On Fri, Jun 23, 2017 at 6:52 PM, Kostas Kloudas  wrote:

> Hi all,
>
> Currently there is an ongoing effort to integrate FlinkCEP with Flink's
> SQL API.
> There is already an open FLIP for this:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 20%3A+Integration+of+SQL+and+CEP
> 
>
> So, if there was an effort for integration of different
> libraries/tools/functionality as well, it
> would be nice to go a bit more into details on i) what is already there,
> ii) what is planned to be
> integrated for the SQL effort, and iii) what else is required, and
> consolidate the resources
> available.
>
> This will allow the community to move faster and with a clear roadmap.
>
> Kostas
>
> On Jun 23, 2017, at 2:51 PM, Suneel Marthi  wrote:
>
> FWIW, here's an old Cloudera blog about using Drools with Spark.
>
> https://blog.cloudera.com/blog/2015/11/how-to-build-a-
> complex-event-processing-app-on-apache-spark-and-drools/
>
> It should be possible to invoke Drools from Flink in a similar way (I have
> not tried it).
>
> It all depends on what the use case and how much of present Flink CEP
> satisfies the use case before considering integration with more complex
> rule engines.
>
>
> Disclaimer: I work for Red Hat
>
> On Fri, Jun 23, 2017 at 8:43 AM, Ismaël Mejía  wrote:
>
>> Hello,
>>
>> It is really interesting to see this discussion because that was one
>> of the questions on the presentation on CEP at Berlin Buzzwords, and
>> this is one line of work that may eventually make sense to explore.
>>
>> Rule engines like drools implement the Rete algorithm that if I
>> understood correctly optimizes the analysis of a relatively big set of
>> facts (conditions) into a simpler evaluation graph. For more details
>> this is a really nice explanation.
>> https://www.sparklinglogic.com/rete-algorithm-demystified-part-2/
>>
>> On flink's CEP I have the impression that you define this graph by
>> hand. Using a rule engine you could infer an optimal graph from the
>> set of rules, and then this graph could be translated into CEP
>> patterns.
>>
>> Of course take all of this with a grain of salt because I am not an
>> expert on both CEP or the Rete algorithm, but I start to see the
>> connection of both worlds more clearly now. So if anyone else has
>> ideas of the feasibility of this or can see some other
>> issues/consequences please comment. I also have the impression that
>> distribution is less of an issue because the rete network is
>> calculated only once and updates are not 'dynamic' (but I might be
>> wrong).
>>
>> Ismaël
>>
>> ps. I add Thomas in copy who was who made the question in the
>> conference in case he has some comments/ideas.
>>
>>
>> On Fri, Jun 23, 2017 at 1:48 PM, Kostas Kloudas
>>  wrote:
>> > Hi Jorn and Sridhar,
>> >
>> > It would be worth describing a bit more what these tools are and what
>> are
>> > your needs.
>> > In addition, and to see what the CEP library already offers here you can
>> > find the documentation:
>> >
>> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/libs/cep.html
>> >
>> >
>> > Thanks,
>> > Kostas
>> >
>> > On Jun 23, 2017, at 1:41 PM, Jörn Franke  wrote:
>> >
>> > Hallo,
>> >
>> > It si possible, but some caveat : flink is a distributed system, but in
>> > drools the fact are only locally available. This may lead to strange
>> effects
>> > when rules update the fact base.
>> >
>> > Best regards
>> >
>> > On 23. Jun 2017, at 12:49, Sridhar Chellappa 
>> wrote:
>> >
>> > Folks,
>> >
>> > I am new to Flink.
>> >
>> > One of the reasons why I am interested in Flink is because of its CEP
>> > library. Our CEP logic comprises of a set of complex business rules
>> which
>> > will have to be managed (Create, Update, Delete) by a bunch of business
>> > analysts.
>> >
>> > Is there a way I can integrate other third party tools (Drools,
>> OpenRules)
>> > to let Business Analysts define rules and  execute them using Flink's
>> CEP
>> > library?
>> >
>> >
>>
>
>
>


Re: Strange behavior of DataStream.countWindow

2017-06-23 Thread Edward
Hi Fabian - 
I've tried this idea of creating a KeyedStream based on
getRuntimeContext().getIndexOfThisSubtask(). However, not all target
subtasks are receiving records.

All subtasks have a parallelism of 12, so I have 12 source subtasks and 12
target subtasks. I've confirmed that the call to getIndexOfThisSubtask is
evenly distributed between 0 and 11. However, 4 out of the 12 target
subtasks (the subtasks after the hash) are no receiving any data. This means
it's not actually keeping all the data local, because at least 4 of the 12
partitions could be getting sent to different TaskManagers.

Do I need to do a .partitionCustom to ensure even/local distribution?

Thanks,
Edward



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Strange-behavior-of-DataStream-countWindow-tp7482p13971.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Integrating Flink CEP with a Rules Engine

2017-06-23 Thread Kostas Kloudas
Hi all,

Currently there is an ongoing effort to integrate FlinkCEP with Flink's SQL API.
There is already an open FLIP for this:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-20%3A+Integration+of+SQL+and+CEP
 


So, if there was an effort for integration of different 
libraries/tools/functionality as well, it 
would be nice to go a bit more into details on i) what is already there, ii) 
what is planned to be 
integrated for the SQL effort, and iii) what else is required, and consolidate 
the resources
available.

This will allow the community to move faster and with a clear roadmap.

Kostas

> On Jun 23, 2017, at 2:51 PM, Suneel Marthi  wrote:
> 
> FWIW, here's an old Cloudera blog about using Drools with Spark.
> 
> https://blog.cloudera.com/blog/2015/11/how-to-build-a-complex-event-processing-app-on-apache-spark-and-drools/
>  
> 
> 
> It should be possible to invoke Drools from Flink in a similar way (I have 
> not tried it). 
> 
> It all depends on what the use case and how much of present Flink CEP 
> satisfies the use case before considering integration with more complex rule 
> engines.
> 
> 
> Disclaimer: I work for Red Hat
> 
> On Fri, Jun 23, 2017 at 8:43 AM, Ismaël Mejía  > wrote:
> Hello,
> 
> It is really interesting to see this discussion because that was one
> of the questions on the presentation on CEP at Berlin Buzzwords, and
> this is one line of work that may eventually make sense to explore.
> 
> Rule engines like drools implement the Rete algorithm that if I
> understood correctly optimizes the analysis of a relatively big set of
> facts (conditions) into a simpler evaluation graph. For more details
> this is a really nice explanation.
> https://www.sparklinglogic.com/rete-algorithm-demystified-part-2/ 
> 
> 
> On flink's CEP I have the impression that you define this graph by
> hand. Using a rule engine you could infer an optimal graph from the
> set of rules, and then this graph could be translated into CEP
> patterns.
> 
> Of course take all of this with a grain of salt because I am not an
> expert on both CEP or the Rete algorithm, but I start to see the
> connection of both worlds more clearly now. So if anyone else has
> ideas of the feasibility of this or can see some other
> issues/consequences please comment. I also have the impression that
> distribution is less of an issue because the rete network is
> calculated only once and updates are not 'dynamic' (but I might be
> wrong).
> 
> Ismaël
> 
> ps. I add Thomas in copy who was who made the question in the
> conference in case he has some comments/ideas.
> 
> 
> On Fri, Jun 23, 2017 at 1:48 PM, Kostas Kloudas
> mailto:k.klou...@data-artisans.com>> wrote:
> > Hi Jorn and Sridhar,
> >
> > It would be worth describing a bit more what these tools are and what are
> > your needs.
> > In addition, and to see what the CEP library already offers here you can
> > find the documentation:
> >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html
> >  
> > 
> >
> >
> > Thanks,
> > Kostas
> >
> > On Jun 23, 2017, at 1:41 PM, Jörn Franke  > > wrote:
> >
> > Hallo,
> >
> > It si possible, but some caveat : flink is a distributed system, but in
> > drools the fact are only locally available. This may lead to strange effects
> > when rules update the fact base.
> >
> > Best regards
> >
> > On 23. Jun 2017, at 12:49, Sridhar Chellappa  > > wrote:
> >
> > Folks,
> >
> > I am new to Flink.
> >
> > One of the reasons why I am interested in Flink is because of its CEP
> > library. Our CEP logic comprises of a set of complex business rules which
> > will have to be managed (Create, Update, Delete) by a bunch of business
> > analysts.
> >
> > Is there a way I can integrate other third party tools (Drools, OpenRules)
> > to let Business Analysts define rules and  execute them using Flink's CEP
> > library?
> >
> >
> 



Re: About nodes number on Flink

2017-06-23 Thread AndreaKinn
Hi Timo, thanks for your answer.
I think my elaboration are not too much heavy so I imagine I will have no
advantages to "parallelize" streams.

In my mind I have this pipeline:


 

And this is exactly what I want develop: a need a pipeline where a node runs
fixed operation and forward elaborated data to the next and so on. How can I
obtain this? 

If I understand "chaining" considers the possibility of execute multiple
operations on same thread to improve performance. 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-nodes-number-on-Flink-tp13927p13969.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Recursive Traversal of the Input Path Directory, Not working

2017-06-23 Thread Adarsh Jain
Hi Stefan,

I think I found the problem, try it with a file which starts with
underscore in the name like "_part-1-0.csv".

While saving Flink appends a "_" to the file name however while reading at
folder level it does not pick those files.

Can you suggest if we can do a setting so that it does not pre appends
underscore while saving a file.

Regards,
Adarsh

On Fri, Jun 23, 2017 at 3:24 PM, Stefan Richter  wrote:

> No, that doesn’t make a difference and also works.
>
> Am 23.06.2017 um 11:40 schrieb Adarsh Jain :
>
> I am using "val env = ExecutionEnvironment.getExecutionEnvironment", can
> this be the problem?
>
> With "import org.apache.flink.api.scala.ExecutionEnvironment"
>
> Using scala in my program.
>
> Regards,
> Adarsh
>
> On Fri, Jun 23, 2017 at 3:01 PM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> I just copy pasted your code, adding the missing "val env
>> = LocalEnvironment.createLocalEnvironment()" and exchanged the string
>> with a local directory for some test files that I created. No other changes.
>>
>> Am 23.06.2017 um 11:25 schrieb Adarsh Jain :
>>
>> Hi Stefan,
>>
>> Thanks for your efforts in checking the same, still doesn't work for me.
>>
>> Can you copy paste the code you used maybe I am doing some silly mistake
>> and am not able to figure out the same.
>>
>> Thanks again.
>>
>> Regards,
>> Adarsh
>>
>>
>> On Fri, Jun 23, 2017 at 2:32 PM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> I tried this out on the current master and the 1.3 release and both work
>>> for me everything works exactly as expected, for file names, a directory,
>>> and even nested directories.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 22.06.2017 um 21:13 schrieb Adarsh Jain :
>>>
>>> Hi Stefan,
>>>
>>> Yes your understood right, when I give full path till the filename it
>>> works fine however when I give path till
>>> directory it does not read the data, doesn't print any exceptions too
>>> ... I am also not sure why it is behaving like this.
>>>
>>> Should be easily replicable, in case you can try. Will be really helpful.
>>>
>>> Regards,
>>> Adarsh
>>>
>>> On Thu, Jun 22, 2017 at 9:00 PM, Stefan Richter <
>>> s.rich...@data-artisans.com> wrote:
>>>
 Hi,

 I am not sure I am getting the problem right: the code works if you use
 a file name, but it does not work for directories? What exactly is not
 working? Do you get any exceptions?

 Best,
 Stefan

 Am 22.06.2017 um 17:01 schrieb Adarsh Jain :

 Hi,

 I am trying to use "Recursive Traversal of the Input Path Directory" in
 Flink 1.3 using scala. Snippet of my code below. If I give exact file name
 it is working fine. Ref https://ci.apache.org/proj
 ects/flink/flink-docs-release-1.3/dev/batch/index.html

 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.configuration.Configuration

 val config = new Configuration
 config.setBoolean("recursive.file.enumeration",true)

 val featuresSource: String = "file:///Users/adarsh/Document
 s/testData/featurecsv/31c710ac40/2017/06/22"

 val testInput = env.readTextFile(featuresSource).withParameters(config)
 testInput.print()

 Please guide how to fix this.

 Regards,
 Adarsh



>>>
>>>
>>
>>
>
>


Re: Integrating Flink CEP with a Rules Engine

2017-06-23 Thread Suneel Marthi
FWIW, here's an old Cloudera blog about using Drools with Spark.

https://blog.cloudera.com/blog/2015/11/how-to-build-a-complex-event-processing-app-on-apache-spark-and-drools/

It should be possible to invoke Drools from Flink in a similar way (I have
not tried it).

It all depends on what the use case and how much of present Flink CEP
satisfies the use case before considering integration with more complex
rule engines.


Disclaimer: I work for Red Hat

On Fri, Jun 23, 2017 at 8:43 AM, Ismaël Mejía  wrote:

> Hello,
>
> It is really interesting to see this discussion because that was one
> of the questions on the presentation on CEP at Berlin Buzzwords, and
> this is one line of work that may eventually make sense to explore.
>
> Rule engines like drools implement the Rete algorithm that if I
> understood correctly optimizes the analysis of a relatively big set of
> facts (conditions) into a simpler evaluation graph. For more details
> this is a really nice explanation.
> https://www.sparklinglogic.com/rete-algorithm-demystified-part-2/
>
> On flink's CEP I have the impression that you define this graph by
> hand. Using a rule engine you could infer an optimal graph from the
> set of rules, and then this graph could be translated into CEP
> patterns.
>
> Of course take all of this with a grain of salt because I am not an
> expert on both CEP or the Rete algorithm, but I start to see the
> connection of both worlds more clearly now. So if anyone else has
> ideas of the feasibility of this or can see some other
> issues/consequences please comment. I also have the impression that
> distribution is less of an issue because the rete network is
> calculated only once and updates are not 'dynamic' (but I might be
> wrong).
>
> Ismaël
>
> ps. I add Thomas in copy who was who made the question in the
> conference in case he has some comments/ideas.
>
>
> On Fri, Jun 23, 2017 at 1:48 PM, Kostas Kloudas
>  wrote:
> > Hi Jorn and Sridhar,
> >
> > It would be worth describing a bit more what these tools are and what are
> > your needs.
> > In addition, and to see what the CEP library already offers here you can
> > find the documentation:
> >
> > https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/libs/cep.html
> >
> >
> > Thanks,
> > Kostas
> >
> > On Jun 23, 2017, at 1:41 PM, Jörn Franke  wrote:
> >
> > Hallo,
> >
> > It si possible, but some caveat : flink is a distributed system, but in
> > drools the fact are only locally available. This may lead to strange
> effects
> > when rules update the fact base.
> >
> > Best regards
> >
> > On 23. Jun 2017, at 12:49, Sridhar Chellappa 
> wrote:
> >
> > Folks,
> >
> > I am new to Flink.
> >
> > One of the reasons why I am interested in Flink is because of its CEP
> > library. Our CEP logic comprises of a set of complex business rules which
> > will have to be managed (Create, Update, Delete) by a bunch of business
> > analysts.
> >
> > Is there a way I can integrate other third party tools (Drools,
> OpenRules)
> > to let Business Analysts define rules and  execute them using Flink's CEP
> > library?
> >
> >
>


Re: Integrating Flink CEP with a Rules Engine

2017-06-23 Thread Ismaël Mejía
Hello,

It is really interesting to see this discussion because that was one
of the questions on the presentation on CEP at Berlin Buzzwords, and
this is one line of work that may eventually make sense to explore.

Rule engines like drools implement the Rete algorithm that if I
understood correctly optimizes the analysis of a relatively big set of
facts (conditions) into a simpler evaluation graph. For more details
this is a really nice explanation.
https://www.sparklinglogic.com/rete-algorithm-demystified-part-2/

On flink's CEP I have the impression that you define this graph by
hand. Using a rule engine you could infer an optimal graph from the
set of rules, and then this graph could be translated into CEP
patterns.

Of course take all of this with a grain of salt because I am not an
expert on both CEP or the Rete algorithm, but I start to see the
connection of both worlds more clearly now. So if anyone else has
ideas of the feasibility of this or can see some other
issues/consequences please comment. I also have the impression that
distribution is less of an issue because the rete network is
calculated only once and updates are not 'dynamic' (but I might be
wrong).

Ismaël

ps. I add Thomas in copy who was who made the question in the
conference in case he has some comments/ideas.


On Fri, Jun 23, 2017 at 1:48 PM, Kostas Kloudas
 wrote:
> Hi Jorn and Sridhar,
>
> It would be worth describing a bit more what these tools are and what are
> your needs.
> In addition, and to see what the CEP library already offers here you can
> find the documentation:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html
>
>
> Thanks,
> Kostas
>
> On Jun 23, 2017, at 1:41 PM, Jörn Franke  wrote:
>
> Hallo,
>
> It si possible, but some caveat : flink is a distributed system, but in
> drools the fact are only locally available. This may lead to strange effects
> when rules update the fact base.
>
> Best regards
>
> On 23. Jun 2017, at 12:49, Sridhar Chellappa  wrote:
>
> Folks,
>
> I am new to Flink.
>
> One of the reasons why I am interested in Flink is because of its CEP
> library. Our CEP logic comprises of a set of complex business rules which
> will have to be managed (Create, Update, Delete) by a bunch of business
> analysts.
>
> Is there a way I can integrate other third party tools (Drools, OpenRules)
> to let Business Analysts define rules and  execute them using Flink's CEP
> library?
>
>


Re: About nodes number on Flink

2017-06-23 Thread Timo Walther

Hi Andrea,

the number of nodes usually depends on the work that you do within your 
Functions.


E.g. if you have a computation intensive machine learning library in a 
MapFunction and takes 10 seconds per element, it might make sense to 
paralellize this in order to increase your throughput. Or if you have to 
save state of several GBs per key which would not fit on one machine.


Flink does not only parallelize per node but also per "slot". If you 
start your application with a parallelism of 2 (and have not configured 
custom parallelisms per operator), you will have two pipelines that 
process elements (so two MapFunctions are running in parallel one in 
each pipeline). 2 slots are occupied in this case. There are operations 
(like keyBy) that break this pipeline and repartition your data.


If you want to run operators in separate slots you can start a new chain 
(see here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups)


If you set parallelism to 'N' but I have less than 'N' SLOTS available, 
you cannot execute the job.


I hope my explanation helps.

Regards,
Timo


Am 22.06.17 um 16:54 schrieb AndreaKinn:

Hello,
I'm developing a Flink toy-application on my local machine before to deploy
the real one on a real cluster.
Now I have to determine how many nodes I need to set the cluster.

I already read these documents:
jobs and scheduling

programming model

parallelism


But I'm still a bit confused about how many nodes I have to consider to
execute my application.

For example if I have the following code (from the doc):


- This means that operations "on same line" are executed on same node? (It
sounds a bit strange to me)

Some confirms:
- If the answer to previous question is yes and if I set parallelism to '1'
I can establish how many nodes I need counting how many operations I have to
perform ?
- If I set parallelism to 'N' but I have less than 'N' nodes available Flink
automatically scales the elaboration on available nodes?

My throughput and data load is not relevant I think, it is not heavy.





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-nodes-number-on-Flink-tp13927.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.





Re: Integrating Flink CEP with a Rules Engine

2017-06-23 Thread Kostas Kloudas
Hi Jorn and Sridhar,

It would be worth describing a bit more what these tools are and what are your 
needs.
In addition, and to see what the CEP library already offers here you can find 
the documentation:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html 



Thanks,
Kostas

> On Jun 23, 2017, at 1:41 PM, Jörn Franke  wrote:
> 
> Hallo,
> 
> It si possible, but some caveat : flink is a distributed system, but in 
> drools the fact are only locally available. This may lead to strange effects 
> when rules update the fact base.
> 
> Best regards 
> 
>> On 23. Jun 2017, at 12:49, Sridhar Chellappa  wrote:
>> 
>> Folks,
>> 
>> I am new to Flink.
>> 
>> One of the reasons why I am interested in Flink is because of its CEP 
>> library. Our CEP logic comprises of a set of complex business rules which 
>> will have to be managed (Create, Update, Delete) by a bunch of business 
>> analysts. 
>> 
>> Is there a way I can integrate other third party tools (Drools, OpenRules) 
>> to let Business Analysts define rules and  execute them using Flink's CEP 
>> library?



Re: Integrating Flink CEP with a Rules Engine

2017-06-23 Thread Jörn Franke
Hallo,

It si possible, but some caveat : flink is a distributed system, but in drools 
the fact are only locally available. This may lead to strange effects when 
rules update the fact base.

Best regards 

> On 23. Jun 2017, at 12:49, Sridhar Chellappa  wrote:
> 
> Folks,
> 
> I am new to Flink.
> 
> One of the reasons why I am interested in Flink is because of its CEP 
> library. Our CEP logic comprises of a set of complex business rules which 
> will have to be managed (Create, Update, Delete) by a bunch of business 
> analysts. 
> 
> Is there a way I can integrate other third party tools (Drools, OpenRules) to 
> let Business Analysts define rules and  execute them using Flink's CEP 
> library?


Integrating Flink CEP with a Rules Engine

2017-06-23 Thread Sridhar Chellappa
Folks,

I am new to Flink.

One of the reasons why I am interested in Flink is because of its CEP
library. Our CEP logic comprises of a set of complex business rules which
will have to be managed (Create, Update, Delete) by a bunch of business
analysts.

Is there a way I can integrate other third party tools (Drools, OpenRules)
to let Business Analysts define rules and  execute them using Flink's CEP
library?


Re: Recursive Traversal of the Input Path Directory, Not working

2017-06-23 Thread Stefan Richter
No, that doesn’t make a difference and also works.

> Am 23.06.2017 um 11:40 schrieb Adarsh Jain :
> 
> I am using "val env = ExecutionEnvironment.getExecutionEnvironment", can this 
> be the problem?
> 
> With "import org.apache.flink.api.scala.ExecutionEnvironment"
> 
> Using scala in my program.
> 
> Regards,
> Adarsh 
> 
> On Fri, Jun 23, 2017 at 3:01 PM, Stefan Richter  > wrote:
> I just copy pasted your code, adding the missing "val env = 
> LocalEnvironment.createLocalEnvironment()" and exchanged the string with a 
> local directory for some test files that I created. No other changes.
> 
>> Am 23.06.2017 um 11:25 schrieb Adarsh Jain > >:
>> 
>> Hi Stefan,
>> 
>> Thanks for your efforts in checking the same, still doesn't work for me. 
>> 
>> Can you copy paste the code you used maybe I am doing some silly mistake and 
>> am not able to figure out the same.
>> 
>> Thanks again.
>> 
>> Regards,
>> Adarsh
>> 
>> 
>> On Fri, Jun 23, 2017 at 2:32 PM, Stefan Richter > > wrote:
>> Hi,
>> 
>> I tried this out on the current master and the 1.3 release and both work for 
>> me everything works exactly as expected, for file names, a directory, and 
>> even nested directories.
>> 
>> Best,
>> Stefan
>> 
>>> Am 22.06.2017 um 21:13 schrieb Adarsh Jain >> >:
>>> 
>>> Hi Stefan,
>>> 
>>> Yes your understood right, when I give full path till the filename it works 
>>> fine however when I give path till 
>>> directory it does not read the data, doesn't print any exceptions too ... I 
>>> am also not sure why it is behaving like this.
>>> 
>>> Should be easily replicable, in case you can try. Will be really helpful.
>>> 
>>> Regards,
>>> Adarsh
>>> 
>>> On Thu, Jun 22, 2017 at 9:00 PM, Stefan Richter 
>>> mailto:s.rich...@data-artisans.com>> wrote:
>>> Hi,
>>> 
>>> I am not sure I am getting the problem right: the code works if you use a 
>>> file name, but it does not work for directories? What exactly is not 
>>> working? Do you get any exceptions?
>>> 
>>> Best,
>>> Stefan
>>> 
 Am 22.06.2017 um 17:01 schrieb Adarsh Jain >>> >:
 
 Hi,
 
 I am trying to use "Recursive Traversal of the Input Path Directory" in 
 Flink 1.3 using scala. Snippet of my code below. If I give exact file name 
 it is working fine. Ref 
 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html
  
 
 
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.configuration.Configuration
 
 val config = new Configuration
 config.setBoolean("recursive.file.enumeration",true)
 
 val featuresSource: String = 
 "file:///Users/adarsh/Documents/testData/featurecsv/31c710ac40/2017/06/22 
 <>"
 
 val testInput = env.readTextFile(featuresSource).withParameters(config)
 testInput.print()
 
 Please guide how to fix this.
 
 Regards,
 Adarsh
 
>>> 
>>> 
>> 
>> 
> 
> 



Re: Recursive Traversal of the Input Path Directory, Not working

2017-06-23 Thread Adarsh Jain
I am using "val env = ExecutionEnvironment.getExecutionEnvironment", can
this be the problem?

With "import org.apache.flink.api.scala.ExecutionEnvironment"

Using scala in my program.

Regards,
Adarsh

On Fri, Jun 23, 2017 at 3:01 PM, Stefan Richter  wrote:

> I just copy pasted your code, adding the missing "val env
> = LocalEnvironment.createLocalEnvironment()" and exchanged the string
> with a local directory for some test files that I created. No other changes.
>
> Am 23.06.2017 um 11:25 schrieb Adarsh Jain :
>
> Hi Stefan,
>
> Thanks for your efforts in checking the same, still doesn't work for me.
>
> Can you copy paste the code you used maybe I am doing some silly mistake
> and am not able to figure out the same.
>
> Thanks again.
>
> Regards,
> Adarsh
>
>
> On Fri, Jun 23, 2017 at 2:32 PM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> I tried this out on the current master and the 1.3 release and both work
>> for me everything works exactly as expected, for file names, a directory,
>> and even nested directories.
>>
>> Best,
>> Stefan
>>
>> Am 22.06.2017 um 21:13 schrieb Adarsh Jain :
>>
>> Hi Stefan,
>>
>> Yes your understood right, when I give full path till the filename it
>> works fine however when I give path till
>> directory it does not read the data, doesn't print any exceptions too ...
>> I am also not sure why it is behaving like this.
>>
>> Should be easily replicable, in case you can try. Will be really helpful.
>>
>> Regards,
>> Adarsh
>>
>> On Thu, Jun 22, 2017 at 9:00 PM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> I am not sure I am getting the problem right: the code works if you use
>>> a file name, but it does not work for directories? What exactly is not
>>> working? Do you get any exceptions?
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 22.06.2017 um 17:01 schrieb Adarsh Jain :
>>>
>>> Hi,
>>>
>>> I am trying to use "Recursive Traversal of the Input Path Directory" in
>>> Flink 1.3 using scala. Snippet of my code below. If I give exact file name
>>> it is working fine. Ref https://ci.apache.org/proj
>>> ects/flink/flink-docs-release-1.3/dev/batch/index.html
>>>
>>> import org.apache.flink.api.java.utils.ParameterTool
>>> import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
>>> import org.apache.flink.configuration.Configuration
>>>
>>> val config = new Configuration
>>> config.setBoolean("recursive.file.enumeration",true)
>>>
>>> val featuresSource: String = "file:///Users/adarsh/Document
>>> s/testData/featurecsv/31c710ac40/2017/06/22"
>>>
>>> val testInput = env.readTextFile(featuresSource).withParameters(config)
>>> testInput.print()
>>>
>>> Please guide how to fix this.
>>>
>>> Regards,
>>> Adarsh
>>>
>>>
>>>
>>
>>
>
>


Re: Recursive Traversal of the Input Path Directory, Not working

2017-06-23 Thread Stefan Richter
I just copy pasted your code, adding the missing "val env = 
LocalEnvironment.createLocalEnvironment()" and exchanged the string with a 
local directory for some test files that I created. No other changes.

> Am 23.06.2017 um 11:25 schrieb Adarsh Jain :
> 
> Hi Stefan,
> 
> Thanks for your efforts in checking the same, still doesn't work for me. 
> 
> Can you copy paste the code you used maybe I am doing some silly mistake and 
> am not able to figure out the same.
> 
> Thanks again.
> 
> Regards,
> Adarsh
> 
> 
> On Fri, Jun 23, 2017 at 2:32 PM, Stefan Richter  > wrote:
> Hi,
> 
> I tried this out on the current master and the 1.3 release and both work for 
> me everything works exactly as expected, for file names, a directory, and 
> even nested directories.
> 
> Best,
> Stefan
> 
>> Am 22.06.2017 um 21:13 schrieb Adarsh Jain > >:
>> 
>> Hi Stefan,
>> 
>> Yes your understood right, when I give full path till the filename it works 
>> fine however when I give path till 
>> directory it does not read the data, doesn't print any exceptions too ... I 
>> am also not sure why it is behaving like this.
>> 
>> Should be easily replicable, in case you can try. Will be really helpful.
>> 
>> Regards,
>> Adarsh
>> 
>> On Thu, Jun 22, 2017 at 9:00 PM, Stefan Richter > > wrote:
>> Hi,
>> 
>> I am not sure I am getting the problem right: the code works if you use a 
>> file name, but it does not work for directories? What exactly is not 
>> working? Do you get any exceptions?
>> 
>> Best,
>> Stefan
>> 
>>> Am 22.06.2017 um 17:01 schrieb Adarsh Jain >> >:
>>> 
>>> Hi,
>>> 
>>> I am trying to use "Recursive Traversal of the Input Path Directory" in 
>>> Flink 1.3 using scala. Snippet of my code below. If I give exact file name 
>>> it is working fine. Ref 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html
>>>  
>>> 
>>> 
>>> import org.apache.flink.api.java.utils.ParameterTool
>>> import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
>>> import org.apache.flink.configuration.Configuration
>>> 
>>> val config = new Configuration
>>> config.setBoolean("recursive.file.enumeration",true)
>>> 
>>> val featuresSource: String = 
>>> "file:///Users/adarsh/Documents/testData/featurecsv/31c710ac40/2017/06/22 
>>> <>"
>>> 
>>> val testInput = env.readTextFile(featuresSource).withParameters(config)
>>> testInput.print()
>>> 
>>> Please guide how to fix this.
>>> 
>>> Regards,
>>> Adarsh
>>> 
>> 
>> 
> 
> 



Re: Recursive Traversal of the Input Path Directory, Not working

2017-06-23 Thread Adarsh Jain
Hi Stefan,

Thanks for your efforts in checking the same, still doesn't work for me.

Can you copy paste the code you used maybe I am doing some silly mistake
and am not able to figure out the same.

Thanks again.

Regards,
Adarsh


On Fri, Jun 23, 2017 at 2:32 PM, Stefan Richter  wrote:

> Hi,
>
> I tried this out on the current master and the 1.3 release and both work
> for me everything works exactly as expected, for file names, a directory,
> and even nested directories.
>
> Best,
> Stefan
>
> Am 22.06.2017 um 21:13 schrieb Adarsh Jain :
>
> Hi Stefan,
>
> Yes your understood right, when I give full path till the filename it
> works fine however when I give path till
> directory it does not read the data, doesn't print any exceptions too ...
> I am also not sure why it is behaving like this.
>
> Should be easily replicable, in case you can try. Will be really helpful.
>
> Regards,
> Adarsh
>
> On Thu, Jun 22, 2017 at 9:00 PM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> I am not sure I am getting the problem right: the code works if you use a
>> file name, but it does not work for directories? What exactly is not
>> working? Do you get any exceptions?
>>
>> Best,
>> Stefan
>>
>> Am 22.06.2017 um 17:01 schrieb Adarsh Jain :
>>
>> Hi,
>>
>> I am trying to use "Recursive Traversal of the Input Path Directory" in
>> Flink 1.3 using scala. Snippet of my code below. If I give exact file name
>> it is working fine. Ref https://ci.apache.org/proj
>> ects/flink/flink-docs-release-1.3/dev/batch/index.html
>>
>> import org.apache.flink.api.java.utils.ParameterTool
>> import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
>> import org.apache.flink.configuration.Configuration
>>
>> val config = new Configuration
>> config.setBoolean("recursive.file.enumeration",true)
>>
>> val featuresSource: String = "file:///Users/adarsh/Document
>> s/testData/featurecsv/31c710ac40/2017/06/22"
>>
>> val testInput = env.readTextFile(featuresSource).withParameters(config)
>> testInput.print()
>>
>> Please guide how to fix this.
>>
>> Regards,
>> Adarsh
>>
>>
>>
>
>


Re: Problem with ProcessFunction timeout feature

2017-06-23 Thread Álvaro Vilaplana García
Well, it sounds very reasonable to me!

I will let you know how it goes.


2017-06-23 10:05 GMT+01:00 Stefan Richter :

> Yes, exactly. The idea would be, that you operate in event time, but
> combine it with processing time timers to trigger timeout detection. Could
> that help for your case?
>
> Am 23.06.2017 um 10:55 schrieb Álvaro Vilaplana García <
> alvaro.vilapl...@gmail.com>:
>
> Hi Stefan,
>
> You meant
>
> /**
>  * Registers a timer to be fired when processing time passes the given time.
>  *
>  * Timers can internally be scoped to keys and/or windows. When you set a 
> timer
>  * in a keyed context, such as in an operation on
>  * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that 
> context
>  * will also be active when you receive the timer notification.
>  */
> void registerProcessingTimeTimer(long time);
>
>
> Am i right?
>
>
> Cheers
>
>
> 2017-06-23 9:51 GMT+01:00 Álvaro Vilaplana García <
> alvaro.vilapl...@gmail.com>:
>
>> Hi Stefan,
>>
>> Thank you for your knowledge, very appreciated.
>>
>> According with the documentation:
>>
>> void registerEventTimeTimer(long time); -> 'Registers a timer to be fired
>> when the event time watermark passes the given time.'
>>
>> Dont we have the same problem? We would need an event (that event does
>> not come soon) to set the watermark and trigger the timer.
>>
>> Or there is another way of setting the watermark based on the processing
>> time instead of the event time?
>>
>> Cheers
>>
>> 2017-06-23 9:24 GMT+01:00 Stefan Richter :
>>
>>> Hi,
>>>
>>> yes, I think you understood the basic concept of watermarks. Events are
>>> basically driving „the event time clock“, so it can only advance when you
>>> see events. I am not sure if I got the part about partitions correctly, but
>>> the watermark event time is a global thing. For example, if you have
>>> multiple Kafka partitions that your source reads, each partition can have a
>>> different current watermark. However, the source must determine the current
>>> event time of the stream, e.g. as the minimum of the watermarks from all
>>> the Kafka partition it reads.
>>>
>>> One thing that might help for your use case is a combination of event
>>> time and processing time. In the processing function, after each device
>>> event, you could register a timer so far ahead in processing time that it
>>> can serve as a signal to check for time outs because you did not receive
>>> events in a long time.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 23.06.2017 um 09:51 schrieb Álvaro Vilaplana García <
>>> alvaro.vilapl...@gmail.com>:
>>>
>>> Hi Stefan,
>>>
>>> Thank you so much for your answer.
>>>
>>> Regarding the 'artificial events', our main problem is that we have no
>>> control at all in the devices.
>>>
>>> I have been reading more about event time and watermarks and what I
>>> understood is that when we use event times (device times) Flink does not
>>> know anything about notion of time and the watermark is a way to help Flink
>>> to set the time of the stream (no more events with event time earlier than
>>> the watermark). That would explain that we need always an event to set the
>>> watermark. Does it make sense?
>>>
>>>
>>> I understood that the watermarks will be per partition
>>> (ByKey(deviceId)), is that right?
>>>
>>>
>>> Cheers
>>>
>>> 2017-06-22 16:26 GMT+01:00 Stefan Richter :
>>>
 Hi,

 if I understand correctly, your problem is that event time does not
 progress in case you don’t receive events, so you cannot detect the timeout
 of devices. Would it make sense to have you source periodically send
 artificial events to advance the watermark in the absence of device events,
 with a certain gap for which you can safely assume that you will no longer
 receive events with a smaller timestamp from any device in the future?
 Because, how else could Flink advance event time without receiving further
 events?

 Best,
 Stefan

 > Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García <
 alvaro.vilapl...@gmail.com>:
 >
 > Hi,
 >
 > Please, can you help me with a problem? I summarise in the next
 points, I hope is enough clear to approach some help.
 >
 >
 > a) We have devices, each with its own ID, which we don’t have control
 of
 >
 > b) These devices send messages, with an internally generated,
 non-synced (amongst other devices) timestamp
 >
 > c) We want to detect when each devices may stop sending messages
 >
 > d) For that, we are using a ProcessFunction
 >
 > e) The devices put the messages in a Kafka topic, partitioned by ID.
 >
 > f) We are struggling with the ProcessFunction timeout feature:
 >
 > We cannot rely on real time (processing time), since the messages
 from the devices may be delayed (even if their timestamp does not show
 these delays) - so we rely on device timestamps instead.
 > In our case an event comes

Re: Error "key group must belong to the backend" on restore

2017-06-23 Thread Gyula Fóra
Hi,
Thanks for the suggestion, I will definitely try this over the weekend.

I wonder if trying to restore it with parallelism = 1 could magically solve
this problem. Maybe that can give us some additional insights.

Cheers
Gyula

On Fri, Jun 23, 2017, 10:35 Stefan Richter 
wrote:

> Hi,
>
> I had a closer look at those exceptions now, and I would expect so see
> this in the case where there is suddenly a mismatch between the key-group
> range assigned to the keyed backend and the key-groups covered by the state
> handle we try to restore. For example like when the wrong state handle was
> sent to restore a task. What I would suggest for debugging before and after
> migrating, on restore, log the key group ranges of the keyed backends and
> the key group ranges of all the keyed state handles it receives for the
> restore. There should be some change between the original and the migrated
> one and we need to track down which of the two changed and how that change
> was introduced by the converting.
>
> Best,
> Stefan
>
> Am 22.06.2017 um 18:50 schrieb Gyula Fóra :
>
> Thanks Stefan for the tip, in this case I have a Long key so it's unlikely
> that the hash code has changed. And as I mentioned I have several jobs with
> the same exact topolgy which run just fine after migration.
>
> It is super weird... Maybe I am blind to some stupid error, so I'll keep
> looking.
>
> Gyula
>
> Stefan Richter  ezt írta (időpont: 2017.
> jún. 22., Cs, 18:10):
>
>> Hi,
>>
>> I have seen the first exception in cases where the key had no proper and
>> stable hash code method, e.g. when the key was an array. What the first
>> exception basically means is that the backend received a key, which it does
>> not expect because determined by the hash the key belongs to a key group
>> for which the backend is not responsible. My guess would be: the hash of
>> the object has changed between the time the checkpoint was taken and now.
>>
>> Best,
>> Stefan
>>
>> Am 22.06.2017 um 17:48 schrieb Gyula Fóra :
>>
>> Hi all!
>>
>>
>> I am wondering if anyone has any practical idea why I might get this error 
>> when migrating a job from 1.2.1 to 1.3.0? Idea on debugging might help as 
>> well.
>>
>>
>> I have several almost exactly similar jobs (minor config differences) and 
>> all of them succeed except for this single job. I have seen similar error 
>> when trying to change max parallelism but that's not the case here. I am not 
>> changing any parallelism setting.
>>
>>
>> I know this is a long shot but you might have encountered similar.
>>
>>
>> Thanks,
>>
>> Gyula
>>
>>
>> java.lang.IllegalStateException: Could not initialize keyed state backend.
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>  at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.IllegalStateException: The key group must belong to the 
>> backend
>>  at 
>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateData(RocksDBKeyedStateBackend.java:1185)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1100)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1081)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:968)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
>>  ... 6 more
>>
>>
>> or
>>
>>
>> java.lang.IllegalArgumentException: Key Group 56 does not belong to the 
>> local range.
>>  at 
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:493)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
>>  a

Re: Problem with ProcessFunction timeout feature

2017-06-23 Thread Stefan Richter
Yes, exactly. The idea would be, that you operate in event time, but combine it 
with processing time timers to trigger timeout detection. Could that help for 
your case?

> Am 23.06.2017 um 10:55 schrieb Álvaro Vilaplana García 
> :
> 
> Hi Stefan,
> 
> You meant 
> 
> /**
>  * Registers a timer to be fired when processing time passes the given time.
>  *
>  * Timers can internally be scoped to keys and/or windows. When you set a 
> timer
>  * in a keyed context, such as in an operation on
>  * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that 
> context
>  * will also be active when you receive the timer notification.
>  */
> void registerProcessingTimeTimer(long time);
> 
> Am i right?
> 
> Cheers
> 
> 2017-06-23 9:51 GMT+01:00 Álvaro Vilaplana García  >:
> Hi Stefan,
> 
> Thank you for your knowledge, very appreciated.
> 
> According with the documentation:
> 
> void registerEventTimeTimer(long time); -> 'Registers a timer to be fired 
> when the event time watermark passes the given time.' 
> 
> Dont we have the same problem? We would need an event (that event does not 
> come soon) to set the watermark and trigger the timer.
> 
> Or there is another way of setting the watermark based on the processing time 
> instead of the event time?
> 
> Cheers
> 
> 2017-06-23 9:24 GMT+01:00 Stefan Richter  >:
> Hi,
> 
> yes, I think you understood the basic concept of watermarks. Events are 
> basically driving „the event time clock“, so it can only advance when you see 
> events. I am not sure if I got the part about partitions correctly, but the 
> watermark event time is a global thing. For example, if you have multiple 
> Kafka partitions that your source reads, each partition can have a different 
> current watermark. However, the source must determine the current event time 
> of the stream, e.g. as the minimum of the watermarks from all the Kafka 
> partition it reads.
> 
> One thing that might help for your use case is a combination of event time 
> and processing time. In the processing function, after each device event, you 
> could register a timer so far ahead in processing time that it can serve as a 
> signal to check for time outs because you did not receive events in a long 
> time.
> 
> Best,
> Stefan
> 
>> Am 23.06.2017 um 09:51 schrieb Álvaro Vilaplana García 
>> mailto:alvaro.vilapl...@gmail.com>>:
>> 
>> Hi Stefan,
>> 
>> Thank you so much for your answer.
>> 
>> Regarding the 'artificial events', our main problem is that we have no 
>> control at all in the devices.
>> 
>> I have been reading more about event time and watermarks and what I 
>> understood is that when we use event times (device times) Flink does not 
>> know anything about notion of time and the watermark is a way to help Flink 
>> to set the time of the stream (no more events with event time earlier than 
>> the watermark). That would explain that we need always an event to set the 
>> watermark. Does it make sense?
>> 
>> 
>> I understood that the watermarks will be per partition (ByKey(deviceId)), is 
>> that right?  
>> 
>> 
>> Cheers
>> 
>> 2017-06-22 16:26 GMT+01:00 Stefan Richter > >:
>> Hi,
>> 
>> if I understand correctly, your problem is that event time does not progress 
>> in case you don’t receive events, so you cannot detect the timeout of 
>> devices. Would it make sense to have you source periodically send artificial 
>> events to advance the watermark in the absence of device events, with a 
>> certain gap for which you can safely assume that you will no longer receive 
>> events with a smaller timestamp from any device in the future? Because, how 
>> else could Flink advance event time without receiving further events?
>> 
>> Best,
>> Stefan
>> 
>> > Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García 
>> > mailto:alvaro.vilapl...@gmail.com>>:
>> >
>> > Hi,
>> >
>> > Please, can you help me with a problem? I summarise in the next points, I 
>> > hope is enough clear to approach some help.
>> >
>> >
>> > a) We have devices, each with its own ID, which we don’t have control of
>> >
>> > b) These devices send messages, with an internally generated, non-synced 
>> > (amongst other devices) timestamp
>> >
>> > c) We want to detect when each devices may stop sending messages
>> >
>> > d) For that, we are using a ProcessFunction
>> >
>> > e) The devices put the messages in a Kafka topic, partitioned by ID.
>> >
>> > f) We are struggling with the ProcessFunction timeout feature:
>> >
>> > We cannot rely on real time (processing time), since the messages from the 
>> > devices may be delayed (even if their timestamp does not show these 
>> > delays) - so we rely on device timestamps instead.
>> > In our case an event comes in which: "Registers a timer to be fired when 
>> > the event time watermark passes the given time". The problem we have is 
>> > there are cases where we do not get a

Re: Recursive Traversal of the Input Path Directory, Not working

2017-06-23 Thread Stefan Richter
Hi,

I tried this out on the current master and the 1.3 release and both work for me 
everything works exactly as expected, for file names, a directory, and even 
nested directories.

Best,
Stefan

> Am 22.06.2017 um 21:13 schrieb Adarsh Jain :
> 
> Hi Stefan,
> 
> Yes your understood right, when I give full path till the filename it works 
> fine however when I give path till 
> directory it does not read the data, doesn't print any exceptions too ... I 
> am also not sure why it is behaving like this.
> 
> Should be easily replicable, in case you can try. Will be really helpful.
> 
> Regards,
> Adarsh
> 
> On Thu, Jun 22, 2017 at 9:00 PM, Stefan Richter  > wrote:
> Hi,
> 
> I am not sure I am getting the problem right: the code works if you use a 
> file name, but it does not work for directories? What exactly is not working? 
> Do you get any exceptions?
> 
> Best,
> Stefan
> 
>> Am 22.06.2017 um 17:01 schrieb Adarsh Jain > >:
>> 
>> Hi,
>> 
>> I am trying to use "Recursive Traversal of the Input Path Directory" in 
>> Flink 1.3 using scala. Snippet of my code below. If I give exact file name 
>> it is working fine. Ref 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html
>>  
>> 
>> 
>> import org.apache.flink.api.java.utils.ParameterTool
>> import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
>> import org.apache.flink.configuration.Configuration
>> 
>> val config = new Configuration
>> config.setBoolean("recursive.file.enumeration",true)
>> 
>> val featuresSource: String = 
>> "file:///Users/adarsh/Documents/testData/featurecsv/31c710ac40/2017/06/22 <>"
>> 
>> val testInput = env.readTextFile(featuresSource).withParameters(config)
>> testInput.print()
>> 
>> Please guide how to fix this.
>> 
>> Regards,
>> Adarsh
>> 
> 
> 



Re: Problem with ProcessFunction timeout feature

2017-06-23 Thread Álvaro Vilaplana García
Hi Stefan,

You meant

/**
 * Registers a timer to be fired when processing time passes the given time.
 *
 * Timers can internally be scoped to keys and/or windows. When you
set a timer
 * in a keyed context, such as in an operation on
 * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then
that context
 * will also be active when you receive the timer notification.
 */
void registerProcessingTimeTimer(long time);


Am i right?


Cheers


2017-06-23 9:51 GMT+01:00 Álvaro Vilaplana García <
alvaro.vilapl...@gmail.com>:

> Hi Stefan,
>
> Thank you for your knowledge, very appreciated.
>
> According with the documentation:
>
> void registerEventTimeTimer(long time); -> 'Registers a timer to be fired
> when the event time watermark passes the given time.'
>
> Dont we have the same problem? We would need an event (that event does not
> come soon) to set the watermark and trigger the timer.
>
> Or there is another way of setting the watermark based on the processing
> time instead of the event time?
>
> Cheers
>
> 2017-06-23 9:24 GMT+01:00 Stefan Richter :
>
>> Hi,
>>
>> yes, I think you understood the basic concept of watermarks. Events are
>> basically driving „the event time clock“, so it can only advance when you
>> see events. I am not sure if I got the part about partitions correctly, but
>> the watermark event time is a global thing. For example, if you have
>> multiple Kafka partitions that your source reads, each partition can have a
>> different current watermark. However, the source must determine the current
>> event time of the stream, e.g. as the minimum of the watermarks from all
>> the Kafka partition it reads.
>>
>> One thing that might help for your use case is a combination of event
>> time and processing time. In the processing function, after each device
>> event, you could register a timer so far ahead in processing time that it
>> can serve as a signal to check for time outs because you did not receive
>> events in a long time.
>>
>> Best,
>> Stefan
>>
>> Am 23.06.2017 um 09:51 schrieb Álvaro Vilaplana García <
>> alvaro.vilapl...@gmail.com>:
>>
>> Hi Stefan,
>>
>> Thank you so much for your answer.
>>
>> Regarding the 'artificial events', our main problem is that we have no
>> control at all in the devices.
>>
>> I have been reading more about event time and watermarks and what I
>> understood is that when we use event times (device times) Flink does not
>> know anything about notion of time and the watermark is a way to help Flink
>> to set the time of the stream (no more events with event time earlier than
>> the watermark). That would explain that we need always an event to set the
>> watermark. Does it make sense?
>>
>>
>> I understood that the watermarks will be per partition (ByKey(deviceId)),
>> is that right?
>>
>>
>> Cheers
>>
>> 2017-06-22 16:26 GMT+01:00 Stefan Richter :
>>
>>> Hi,
>>>
>>> if I understand correctly, your problem is that event time does not
>>> progress in case you don’t receive events, so you cannot detect the timeout
>>> of devices. Would it make sense to have you source periodically send
>>> artificial events to advance the watermark in the absence of device events,
>>> with a certain gap for which you can safely assume that you will no longer
>>> receive events with a smaller timestamp from any device in the future?
>>> Because, how else could Flink advance event time without receiving further
>>> events?
>>>
>>> Best,
>>> Stefan
>>>
>>> > Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García <
>>> alvaro.vilapl...@gmail.com>:
>>> >
>>> > Hi,
>>> >
>>> > Please, can you help me with a problem? I summarise in the next
>>> points, I hope is enough clear to approach some help.
>>> >
>>> >
>>> > a) We have devices, each with its own ID, which we don’t have control
>>> of
>>> >
>>> > b) These devices send messages, with an internally generated,
>>> non-synced (amongst other devices) timestamp
>>> >
>>> > c) We want to detect when each devices may stop sending messages
>>> >
>>> > d) For that, we are using a ProcessFunction
>>> >
>>> > e) The devices put the messages in a Kafka topic, partitioned by ID.
>>> >
>>> > f) We are struggling with the ProcessFunction timeout feature:
>>> >
>>> > We cannot rely on real time (processing time), since the messages from
>>> the devices may be delayed (even if their timestamp does not show these
>>> delays) - so we rely on device timestamps instead.
>>> > In our case an event comes in which: "Registers a timer to be fired
>>> when the event time watermark passes the given time". The problem we have
>>> is there are cases where we do not get an additional event after the first
>>> event- which means that the original event timeouts are not triggered.
>>> >
>>> > As a side note we've seen in unit tests that flink seems to set a
>>> watermark after the last event with a Long.MaxValue (9223372036854775807) -
>>> which hides the above problem.
>>> >
>>> > I am using Scala 2.11 /Flink versions 1.2.0
>>> >
>>> > Regar

Re: Problem with ProcessFunction timeout feature

2017-06-23 Thread Álvaro Vilaplana García
Hi Stefan,

Thank you for your knowledge, very appreciated.

According with the documentation:

void registerEventTimeTimer(long time); -> 'Registers a timer to be fired
when the event time watermark passes the given time.'

Dont we have the same problem? We would need an event (that event does not
come soon) to set the watermark and trigger the timer.

Or there is another way of setting the watermark based on the processing
time instead of the event time?

Cheers

2017-06-23 9:24 GMT+01:00 Stefan Richter :

> Hi,
>
> yes, I think you understood the basic concept of watermarks. Events are
> basically driving „the event time clock“, so it can only advance when you
> see events. I am not sure if I got the part about partitions correctly, but
> the watermark event time is a global thing. For example, if you have
> multiple Kafka partitions that your source reads, each partition can have a
> different current watermark. However, the source must determine the current
> event time of the stream, e.g. as the minimum of the watermarks from all
> the Kafka partition it reads.
>
> One thing that might help for your use case is a combination of event time
> and processing time. In the processing function, after each device event,
> you could register a timer so far ahead in processing time that it can
> serve as a signal to check for time outs because you did not receive events
> in a long time.
>
> Best,
> Stefan
>
> Am 23.06.2017 um 09:51 schrieb Álvaro Vilaplana García <
> alvaro.vilapl...@gmail.com>:
>
> Hi Stefan,
>
> Thank you so much for your answer.
>
> Regarding the 'artificial events', our main problem is that we have no
> control at all in the devices.
>
> I have been reading more about event time and watermarks and what I
> understood is that when we use event times (device times) Flink does not
> know anything about notion of time and the watermark is a way to help Flink
> to set the time of the stream (no more events with event time earlier than
> the watermark). That would explain that we need always an event to set the
> watermark. Does it make sense?
>
>
> I understood that the watermarks will be per partition (ByKey(deviceId)),
> is that right?
>
>
> Cheers
>
> 2017-06-22 16:26 GMT+01:00 Stefan Richter :
>
>> Hi,
>>
>> if I understand correctly, your problem is that event time does not
>> progress in case you don’t receive events, so you cannot detect the timeout
>> of devices. Would it make sense to have you source periodically send
>> artificial events to advance the watermark in the absence of device events,
>> with a certain gap for which you can safely assume that you will no longer
>> receive events with a smaller timestamp from any device in the future?
>> Because, how else could Flink advance event time without receiving further
>> events?
>>
>> Best,
>> Stefan
>>
>> > Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García <
>> alvaro.vilapl...@gmail.com>:
>> >
>> > Hi,
>> >
>> > Please, can you help me with a problem? I summarise in the next points,
>> I hope is enough clear to approach some help.
>> >
>> >
>> > a) We have devices, each with its own ID, which we don’t have control of
>> >
>> > b) These devices send messages, with an internally generated,
>> non-synced (amongst other devices) timestamp
>> >
>> > c) We want to detect when each devices may stop sending messages
>> >
>> > d) For that, we are using a ProcessFunction
>> >
>> > e) The devices put the messages in a Kafka topic, partitioned by ID.
>> >
>> > f) We are struggling with the ProcessFunction timeout feature:
>> >
>> > We cannot rely on real time (processing time), since the messages from
>> the devices may be delayed (even if their timestamp does not show these
>> delays) - so we rely on device timestamps instead.
>> > In our case an event comes in which: "Registers a timer to be fired
>> when the event time watermark passes the given time". The problem we have
>> is there are cases where we do not get an additional event after the first
>> event- which means that the original event timeouts are not triggered.
>> >
>> > As a side note we've seen in unit tests that flink seems to set a
>> watermark after the last event with a Long.MaxValue (9223372036854775807) -
>> which hides the above problem.
>> >
>> > I am using Scala 2.11 /Flink versions 1.2.0
>> >
>> > Regards
>> > --
>> > __
>> >
>> > Álvaro Vilaplana García
>>
>>
>
>
> --
> __
>
> Álvaro Vilaplana García
>
>
>


-- 
__

Álvaro Vilaplana García


Re: Error "key group must belong to the backend" on restore

2017-06-23 Thread Stefan Richter
Hi,

I had a closer look at those exceptions now, and I would expect so see this in 
the case where there is suddenly a mismatch between the key-group range 
assigned to the keyed backend and the key-groups covered by the state handle we 
try to restore. For example like when the wrong state handle was sent to 
restore a task. What I would suggest for debugging before and after migrating, 
on restore, log the key group ranges of the keyed backends and the key group 
ranges of all the keyed state handles it receives for the restore. There should 
be some change between the original and the migrated one and we need to track 
down which of the two changed and how that change was introduced by the 
converting.

Best,
Stefan  

> Am 22.06.2017 um 18:50 schrieb Gyula Fóra :
> 
> Thanks Stefan for the tip, in this case I have a Long key so it's unlikely 
> that the hash code has changed. And as I mentioned I have several jobs with 
> the same exact topolgy which run just fine after migration.
> 
> It is super weird... Maybe I am blind to some stupid error, so I'll keep 
> looking.
> 
> Gyula
> 
> Stefan Richter  > ezt írta (időpont: 2017. jún. 22., Cs, 
> 18:10):
> Hi,
> 
> I have seen the first exception in cases where the key had no proper and 
> stable hash code method, e.g. when the key was an array. What the first 
> exception basically means is that the backend received a key, which it does 
> not expect because determined by the hash the key belongs to a key group for 
> which the backend is not responsible. My guess would be: the hash of the 
> object has changed between the time the checkpoint was taken and now.
> 
> Best,
> Stefan
> 
>> Am 22.06.2017 um 17:48 schrieb Gyula Fóra > >:
>> 
>> Hi all!
>> 
>> I am wondering if anyone has any practical idea why I might get this error 
>> when migrating a job from 1.2.1 to 1.3.0? Idea on debugging might help as 
>> well.
>> 
>> I have several almost exactly similar jobs (minor config differences) and 
>> all of them succeed except for this single job. I have seen similar error 
>> when trying to change max parallelism but that's not the case here. I am not 
>> changing any parallelism setting.
>> 
>> I know this is a long shot but you might have encountered similar.
>> 
>> Thanks,
>> Gyula
>> 
>> java.lang.IllegalStateException: Could not initialize keyed state backend.
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>  at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.IllegalStateException: The key group must belong to the 
>> backend
>>  at 
>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateData(RocksDBKeyedStateBackend.java:1185)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1100)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1081)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:968)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
>>  ... 6 more
>> 
>> or 
>> 
>> java.lang.IllegalArgumentException: Key Group 56 does not belong to the 
>> local range.
>>  at 
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:493)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>>  at 
>> org.apache.flink.streaming

Re: Different Window Sizes in keyed stream

2017-06-23 Thread Ahmad Hassan
Thanks Fabian for the advice!

Best Regards,

Dr. Ahmad Hassan

On 23 June 2017 at 09:05, Fabian Hueske  wrote:

> Hi Ahmad,
>
> that is not possible, at least not with Flink's built-in windows.
> You can probably implement something like that on top of the DataStream
> API but I think it would quite a bit of effort.
>
> IMO, the better approach would be to start a separate Flink job per
> tenant. This would also improve the isolation and failure behavior.
>
> Best, Fabian
>
> 2017-06-22 19:43 GMT+02:00 Ahmad Hassan :
>
>> Hi All,
>>
>> I want to know if flink allows to define sliding window size and slide
>> time on the fly. For example I want to configure sliding window of size 2
>> min and slide 1 min for tenant A but size 10 min and slide min for tenant B
>> in a keyed stream and so on for other tenants. My code is below.
>>
>> final DataStream eventStream = inputStream
>> .keyBy(TENANT, CATEGORY)
>> .window(SlidingProcessingTimeWindows.of(Time.minutes(2,Time.minute(1)))
>> .fold(new WindowStats(), newProductAggregationMapper(),
>> newProductAggregationWindowFunction());
>>
>> Can I do that for unlimited number of tenants in flink ?
>>
>> Cheers,
>>
>> Dr. Ahmad Hassan
>
>
>


Re: Problem with ProcessFunction timeout feature

2017-06-23 Thread Stefan Richter
Hi,

yes, I think you understood the basic concept of watermarks. Events are 
basically driving „the event time clock“, so it can only advance when you see 
events. I am not sure if I got the part about partitions correctly, but the 
watermark event time is a global thing. For example, if you have multiple Kafka 
partitions that your source reads, each partition can have a different current 
watermark. However, the source must determine the current event time of the 
stream, e.g. as the minimum of the watermarks from all the Kafka partition it 
reads.

One thing that might help for your use case is a combination of event time and 
processing time. In the processing function, after each device event, you could 
register a timer so far ahead in processing time that it can serve as a signal 
to check for time outs because you did not receive events in a long time.

Best,
Stefan

> Am 23.06.2017 um 09:51 schrieb Álvaro Vilaplana García 
> :
> 
> Hi Stefan,
> 
> Thank you so much for your answer.
> 
> Regarding the 'artificial events', our main problem is that we have no 
> control at all in the devices.
> 
> I have been reading more about event time and watermarks and what I 
> understood is that when we use event times (device times) Flink does not know 
> anything about notion of time and the watermark is a way to help Flink to set 
> the time of the stream (no more events with event time earlier than the 
> watermark). That would explain that we need always an event to set the 
> watermark. Does it make sense?
> 
> 
> I understood that the watermarks will be per partition (ByKey(deviceId)), is 
> that right?  
> 
> 
> Cheers
> 
> 2017-06-22 16:26 GMT+01:00 Stefan Richter  >:
> Hi,
> 
> if I understand correctly, your problem is that event time does not progress 
> in case you don’t receive events, so you cannot detect the timeout of 
> devices. Would it make sense to have you source periodically send artificial 
> events to advance the watermark in the absence of device events, with a 
> certain gap for which you can safely assume that you will no longer receive 
> events with a smaller timestamp from any device in the future? Because, how 
> else could Flink advance event time without receiving further events?
> 
> Best,
> Stefan
> 
> > Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García 
> > mailto:alvaro.vilapl...@gmail.com>>:
> >
> > Hi,
> >
> > Please, can you help me with a problem? I summarise in the next points, I 
> > hope is enough clear to approach some help.
> >
> >
> > a) We have devices, each with its own ID, which we don’t have control of
> >
> > b) These devices send messages, with an internally generated, non-synced 
> > (amongst other devices) timestamp
> >
> > c) We want to detect when each devices may stop sending messages
> >
> > d) For that, we are using a ProcessFunction
> >
> > e) The devices put the messages in a Kafka topic, partitioned by ID.
> >
> > f) We are struggling with the ProcessFunction timeout feature:
> >
> > We cannot rely on real time (processing time), since the messages from the 
> > devices may be delayed (even if their timestamp does not show these delays) 
> > - so we rely on device timestamps instead.
> > In our case an event comes in which: "Registers a timer to be fired when 
> > the event time watermark passes the given time". The problem we have is 
> > there are cases where we do not get an additional event after the first 
> > event- which means that the original event timeouts are not triggered.
> >
> > As a side note we've seen in unit tests that flink seems to set a watermark 
> > after the last event with a Long.MaxValue (9223372036854775807) - which 
> > hides the above problem.
> >
> > I am using Scala 2.11 /Flink versions 1.2.0
> >
> > Regards
> > --
> > __
> >
> > Álvaro Vilaplana García
> 
> 
> 
> 
> -- 
> __
> 
> Álvaro Vilaplana García



Re: Different Window Sizes in keyed stream

2017-06-23 Thread Fabian Hueske
Hi Ahmad,

that is not possible, at least not with Flink's built-in windows.
You can probably implement something like that on top of the DataStream API
but I think it would quite a bit of effort.

IMO, the better approach would be to start a separate Flink job per tenant.
This would also improve the isolation and failure behavior.

Best, Fabian

2017-06-22 19:43 GMT+02:00 Ahmad Hassan :

> Hi All,
>
> I want to know if flink allows to define sliding window size and slide
> time on the fly. For example I want to configure sliding window of size 2
> min and slide 1 min for tenant A but size 10 min and slide min for tenant B
> in a keyed stream and so on for other tenants. My code is below.
>
> final DataStream eventStream = inputStream
> .keyBy(TENANT, CATEGORY)
> .window(SlidingProcessingTimeWindows.of(Time.minutes(2,Time.minute(1)))
> .fold(new WindowStats(), newProductAggregationMapper(),
> newProductAggregationWindowFunction());
>
> Can I do that for unlimited number of tenants in flink ?
>
> Cheers,
>
> Dr. Ahmad Hassan


Re: Problem with ProcessFunction timeout feature

2017-06-23 Thread Álvaro Vilaplana García
Hi Stefan,

Thank you so much for your answer.

Regarding the 'artificial events', our main problem is that we have no
control at all in the devices.

I have been reading more about event time and watermarks and what I
understood is that when we use event times (device times) Flink does not
know anything about notion of time and the watermark is a way to help Flink
to set the time of the stream (no more events with event time earlier than
the watermark). That would explain that we need always an event to set the
watermark. Does it make sense?


I understood that the watermarks will be per partition (ByKey(deviceId)),
is that right?


Cheers

2017-06-22 16:26 GMT+01:00 Stefan Richter :

> Hi,
>
> if I understand correctly, your problem is that event time does not
> progress in case you don’t receive events, so you cannot detect the timeout
> of devices. Would it make sense to have you source periodically send
> artificial events to advance the watermark in the absence of device events,
> with a certain gap for which you can safely assume that you will no longer
> receive events with a smaller timestamp from any device in the future?
> Because, how else could Flink advance event time without receiving further
> events?
>
> Best,
> Stefan
>
> > Am 22.06.2017 um 16:35 schrieb Álvaro Vilaplana García <
> alvaro.vilapl...@gmail.com>:
> >
> > Hi,
> >
> > Please, can you help me with a problem? I summarise in the next points,
> I hope is enough clear to approach some help.
> >
> >
> > a) We have devices, each with its own ID, which we don’t have control of
> >
> > b) These devices send messages, with an internally generated, non-synced
> (amongst other devices) timestamp
> >
> > c) We want to detect when each devices may stop sending messages
> >
> > d) For that, we are using a ProcessFunction
> >
> > e) The devices put the messages in a Kafka topic, partitioned by ID.
> >
> > f) We are struggling with the ProcessFunction timeout feature:
> >
> > We cannot rely on real time (processing time), since the messages from
> the devices may be delayed (even if their timestamp does not show these
> delays) - so we rely on device timestamps instead.
> > In our case an event comes in which: "Registers a timer to be fired when
> the event time watermark passes the given time". The problem we have is
> there are cases where we do not get an additional event after the first
> event- which means that the original event timeouts are not triggered.
> >
> > As a side note we've seen in unit tests that flink seems to set a
> watermark after the last event with a Long.MaxValue (9223372036854775807) -
> which hides the above problem.
> >
> > I am using Scala 2.11 /Flink versions 1.2.0
> >
> > Regards
> > --
> > __
> >
> > Álvaro Vilaplana García
>
>


-- 
__

Álvaro Vilaplana García