Re: Any remote opportunity to work on Flink project?

2018-06-05 Thread Christophe Salperwyck
Still some people are interested to pay people to build a product around
Flink :-)

Interested too about Flink and online ML!

Cheers,
Christophe

On Wed, 6 Jun 2018 at 07:40, Garvit Sharma  wrote:

> Flink is OpenSource!!
>
> On Wed, Jun 6, 2018 at 10:45 AM, Deepak Sharma 
> wrote:
>
>> Hi Flink Users,
>> Sorry to spam your inbox and GM all.
>> I am looking for opportunity to work on Flink project , specifically if
>> its Flink ML over streaming
>> Please do let me know if anyone is looking for freelancers around any of
>> their Flink projects.
>>
>> --
>> Thanks
>> Deepak
>>
>
>
>
> --
>
> Garvit Sharma
> github.com/garvitlnmiit/
>
> No Body is a Scholar by birth, its only hard work and strong determination
> that makes him master.
>


Re: Any remote opportunity to work on Flink project?

2018-06-05 Thread Garvit Sharma
Flink is OpenSource!!

On Wed, Jun 6, 2018 at 10:45 AM, Deepak Sharma 
wrote:

> Hi Flink Users,
> Sorry to spam your inbox and GM all.
> I am looking for opportunity to work on Flink project , specifically if
> its Flink ML over streaming
> Please do let me know if anyone is looking for freelancers around any of
> their Flink projects.
>
> --
> Thanks
> Deepak
>



-- 

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination
that makes him master.


Any remote opportunity to work on Flink project?

2018-06-05 Thread Deepak Sharma
Hi Flink Users,
Sorry to spam your inbox and GM all.
I am looking for opportunity to work on Flink project , specifically if its
Flink ML over streaming
Please do let me know if anyone is looking for freelancers around any of
their Flink projects.

-- 
Thanks
Deepak


Can not run scala-shell in yarn mode in flink 1.5

2018-06-05 Thread Jeff Zhang
I try to run scala-shell in yarn mode in 1.5, but hit the following error.
I can run it successfully in 1.4.2. It is the same even when I change the
mode to legacy.  Is this a known issue or something changed in 1.5 ? Thanks

Command I Use: bin/start-scala-shell.sh yarn -n 1


Starting Flink Shell:
2018-06-06 12:30:02,672 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, localhost
2018-06-06 12:30:02,673 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2018-06-06 12:30:02,674 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.mb, 1024
2018-06-06 12:30:02,674 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.mb, 1024
2018-06-06 12:30:02,674 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-06-06 12:30:02,674 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2018-06-06 12:30:02,675 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: rest.port, 8081
Exception in thread "main" java.lang.UnsupportedOperationException: Can't
deploy a standalone cluster.
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
at
org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
at
org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
at
org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)


Re: [DISCUSS] Flink 1.6 features

2018-06-05 Thread Yan Zhou [FDS Science]
+1 on https://issues.apache.org/jira/browse/FLINK-5479

[FLINK-5479] Per-partition watermarks in 
...
issues.apache.org
Reported in ML: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
 It's normally not a common case to have Kafka partitions not producing any 
data, but it'll probably be good to handle this as well. I ...



From: Rico Bergmann 
Sent: Tuesday, June 5, 2018 9:12:00 PM
To: Hao Sun
Cc: d...@flink.apache.org; user
Subject: Re: [DISCUSS] Flink 1.6 features

+1 on K8s integration



Am 06.06.2018 um 00:01 schrieb Hao Sun 
mailto:ha...@zendesk.com>>:

adding my vote to K8S Job mode, maybe it is this?
> Smoothen the integration in Container environment, like "Flink as a Library", 
> and easier integration with Kubernetes services and other proxies.



On Mon, Jun 4, 2018 at 11:01 PM Ben Yan 
mailto:yan.xiao.bin.m...@gmail.com>> wrote:
Hi Stephan,

Will  [ https://issues.apache.org/jira/browse/FLINK-5479 ]  (Per-partition 
watermarks in FlinkKafkaConsumer should consider idle partitions) be included 
in 1.6? As we are seeing more users with this issue on the mailing lists.

Thanks.
Ben

2018-06-05 5:29 GMT+08:00 Che Lui Shum 
mailto:sh...@us.ibm.com>>:

Hi Stephan,

Will FLINK-7129 (Support dynamically changing CEP patterns) be included in 1.6? 
There were discussions about possibly including it in 1.6:
http://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3cCAMq=ou7gru2o9jtowxn1lc1f7nkcxayn6a3e58kxctb4b50...@mail.gmail.com%3e

Thanks,
Shirley Shum

[Inactive hide details for Stephan Ewen ---06/04/2018 02:21:47 AM---Hi Flink 
Community! The release of Apache Flink 1.5 has happ]Stephan Ewen ---06/04/2018 
02:21:47 AM---Hi Flink Community! The release of Apache Flink 1.5 has happened 
(yay!) - so it is a good time

From: Stephan Ewen mailto:se...@apache.org>>
To: d...@flink.apache.org, user 
mailto:user@flink.apache.org>>
Date: 06/04/2018 02:21 AM
Subject: [DISCUSS] Flink 1.6 features





Hi Flink Community!

The release of Apache Flink 1.5 has happened (yay!) - so it is a good time to 
start talking about what to do for release 1.6.

== Suggested release timeline ==

I would propose to release around end of July (that is 8-9 weeks from now).

The rational behind that: There was a lot of effort in release testing 
automation (end-to-end tests, scripted stress tests) as part of release 1.5. 
You may have noticed the big set of new modules under "flink-end-to-end-tests" 
in the Flink repository. It delayed the 1.5 release a bit, and needs to 
continue as part of the coming release cycle, but should help make releasing 
more lightweight from now on.

(Side note: There are also some nightly stress tests that we created and run at 
data Artisans, and where we are looking whether and in which way it would make 
sense to contribute them to Flink.)

== Features and focus areas ==

We had a lot of big and heavy features in Flink 1.5, with FLIP-6, the new 
network stack, recovery, SQL joins and client, ... Following something like a 
"tick-tock-model", I would suggest to focus the next release more on 
integrations, tooling, and reducing user friction.

Of course, this does not mean that no other pull request gets reviewed, an no 
other topic will be examined - it is simply meant as a help to understand where 
to expect more activity during the next release cycle. Note that these are 
really the coarse focus areas - don't read this as a comprehensive list.

This list is my first suggestion, based on discussions with committers, users, 
and mailing list questions.

  - Support Java 9 and Scala 2.12

  - Smoothen the integration in Container environment, like "Flink as a 
Library", and easier integration with Kubernetes services and other proxies.

  - Polish the remaing parts of the FLIP-6 rewrite

  - Improve state backends with asynchronous timer snapshots, efficient timer 
deletes, state TTL, and broadcast state support in RocksDB.

  - Extends Streaming Sinks:
 - Bucketing Sink should support S3 properly (compensate for eventual 
consistency), work with Flink's shaded S3 file systems, and efficiently support 
formats that compress/index arcoss individual rows (Parquet, ORC, ...)
 - Support ElasticSearch's new REST API

  - Smoothen State Evolution to support type conversion on snapshot restore

  - Enhance Stream SQL and CEP
 - Add support for "update by key" Table Sources
 - Add more table sources and sinks (Kafka, Kinesis, Files, K/V stores)
 - Expand SQL client
 - Integrate CEP and SQL, through MATCH_RECOGNIZE clause
 - Improve CEP Performance of SharedBuffer on RocksDB






Re: [DISCUSS] Flink 1.6 features

2018-06-05 Thread Rico Bergmann
+1 on K8s integration 



> Am 06.06.2018 um 00:01 schrieb Hao Sun :
> 
> adding my vote to K8S Job mode, maybe it is this?
> > Smoothen the integration in Container environment, like "Flink as a 
> > Library", and easier integration with Kubernetes services and other proxies.
> 
> 
> 
>> On Mon, Jun 4, 2018 at 11:01 PM Ben Yan  wrote:
>> Hi Stephan,
>> 
>> Will  [ https://issues.apache.org/jira/browse/FLINK-5479 ]  (Per-partition 
>> watermarks in FlinkKafkaConsumer should consider idle partitions) be 
>> included in 1.6? As we are seeing more users with this issue on the mailing 
>> lists.
>> 
>> Thanks.
>> Ben
>> 
>> 2018-06-05 5:29 GMT+08:00 Che Lui Shum :
>>> Hi Stephan,
>>> 
>>> Will FLINK-7129 (Support dynamically changing CEP patterns) be included in 
>>> 1.6? There were discussions about possibly including it in 1.6: 
>>> http://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3cCAMq=ou7gru2o9jtowxn1lc1f7nkcxayn6a3e58kxctb4b50...@mail.gmail.com%3e
>>> 
>>> Thanks,
>>> Shirley Shum
>>> 
>>> Stephan Ewen ---06/04/2018 02:21:47 AM---Hi Flink Community! The release of 
>>> Apache Flink 1.5 has happened (yay!) - so it is a good time
>>> 
>>> From: Stephan Ewen 
>>> To: d...@flink.apache.org, user 
>>> Date: 06/04/2018 02:21 AM
>>> Subject: [DISCUSS] Flink 1.6 features
>>> 
>>> 
>>> 
>>> 
>>> Hi Flink Community!
>>> 
>>> The release of Apache Flink 1.5 has happened (yay!) - so it is a good time 
>>> to start talking about what to do for release 1.6.
>>> 
>>> == Suggested release timeline ==
>>> 
>>> I would propose to release around end of July (that is 8-9 weeks from now).
>>> 
>>> The rational behind that: There was a lot of effort in release testing 
>>> automation (end-to-end tests, scripted stress tests) as part of release 
>>> 1.5. You may have noticed the big set of new modules under 
>>> "flink-end-to-end-tests" in the Flink repository. It delayed the 1.5 
>>> release a bit, and needs to continue as part of the coming release cycle, 
>>> but should help make releasing more lightweight from now on.
>>> 
>>> (Side note: There are also some nightly stress tests that we created and 
>>> run at data Artisans, and where we are looking whether and in which way it 
>>> would make sense to contribute them to Flink.)
>>> 
>>> == Features and focus areas ==
>>> 
>>> We had a lot of big and heavy features in Flink 1.5, with FLIP-6, the new 
>>> network stack, recovery, SQL joins and client, ... Following something like 
>>> a "tick-tock-model", I would suggest to focus the next release more on 
>>> integrations, tooling, and reducing user friction. 
>>> 
>>> Of course, this does not mean that no other pull request gets reviewed, an 
>>> no other topic will be examined - it is simply meant as a help to 
>>> understand where to expect more activity during the next release cycle. 
>>> Note that these are really the coarse focus areas - don't read this as a 
>>> comprehensive list.
>>> 
>>> This list is my first suggestion, based on discussions with committers, 
>>> users, and mailing list questions.
>>> 
>>>   - Support Java 9 and Scala 2.12
>>>   
>>>   - Smoothen the integration in Container environment, like "Flink as a 
>>> Library", and easier integration with Kubernetes services and other proxies.
>>>   
>>>   - Polish the remaing parts of the FLIP-6 rewrite
>>> 
>>>   - Improve state backends with asynchronous timer snapshots, efficient 
>>> timer deletes, state TTL, and broadcast state support in RocksDB.
>>> 
>>>   - Extends Streaming Sinks:
>>>  - Bucketing Sink should support S3 properly (compensate for eventual 
>>> consistency), work with Flink's shaded S3 file systems, and efficiently 
>>> support formats that compress/index arcoss individual rows (Parquet, ORC, 
>>> ...)
>>>  - Support ElasticSearch's new REST API
>>> 
>>>   - Smoothen State Evolution to support type conversion on snapshot restore
>>>   
>>>   - Enhance Stream SQL and CEP
>>>  - Add support for "update by key" Table Sources
>>>  - Add more table sources and sinks (Kafka, Kinesis, Files, K/V stores)
>>>  - Expand SQL client
>>>  - Integrate CEP and SQL, through MATCH_RECOGNIZE clause
>>>  - Improve CEP Performance of SharedBuffer on RocksDB
>>> 
>>> 
>>> 
>> 


guidelines for setting parallelism in operations/job?

2018-06-05 Thread chrisr123
Hello,
I'm trying to get some simple rules or guidelines for what values to set for
operator or job
parallelism. It would seem to me that it should be a number <= the number of
available task
slots? 

For example, suppose I have 2 task manager machines, each with 4 task slots.
Assuming no other jobs running on the cluster, would I set the parallelism
for operations
like filter and map to 8? If not, what would be a reasonable number?

What happens if you request more parallelism than they are task slots? In
example above,
what happens if I set parallelism to 12 on the operations? I'm assuming it
would just use as many
as are available?

Also, it would seem that you would not want to hardcode the parallelism into 
your source code, since
you would want to have a rough idea of available task slots when  you submit
the job? 
Should you set parallelism to all operators roughly the same or different
values, and what would guide
that decision?

https://www.elastic.co/webinars/getting-started-elasticsearch?elektra=home=sub1
Thanks!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Flink 1.6 features

2018-06-05 Thread Hao Sun
adding my vote to K8S Job mode, maybe it is this?
> Smoothen the integration in Container environment, like "Flink as a
Library", and easier integration with Kubernetes services and other proxies.



On Mon, Jun 4, 2018 at 11:01 PM Ben Yan  wrote:

> Hi Stephan,
>
> Will  [ https://issues.apache.org/jira/browse/FLINK-5479
>  ]
> (Per-partition watermarks in FlinkKafkaConsumer should consider idle
> partitions) be included in 1.6? As we are seeing more users with this
> issue on the mailing lists.
>
> Thanks.
> Ben
>
> 2018-06-05 5:29 GMT+08:00 Che Lui Shum :
>
>> Hi Stephan,
>>
>> Will FLINK-7129 (Support dynamically changing CEP patterns) be included
>> in 1.6? There were discussions about possibly including it in 1.6:
>>
>> http://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3cCAMq=ou7gru2o9jtowxn1lc1f7nkcxayn6a3e58kxctb4b50...@mail.gmail.com%3e
>> 
>>
>> Thanks,
>> Shirley Shum
>>
>> [image: Inactive hide details for Stephan Ewen ---06/04/2018 02:21:47
>> AM---Hi Flink Community! The release of Apache Flink 1.5 has happ]Stephan
>> Ewen ---06/04/2018 02:21:47 AM---Hi Flink Community! The release of Apache
>> Flink 1.5 has happened (yay!) - so it is a good time
>>
>> From: Stephan Ewen 
>> To: d...@flink.apache.org, user 
>> Date: 06/04/2018 02:21 AM
>> Subject: [DISCUSS] Flink 1.6 features
>> --
>>
>>
>>
>> Hi Flink Community!
>>
>> The release of Apache Flink 1.5 has happened (yay!) - so it is a good
>> time to start talking about what to do for release 1.6.
>>
>> *== Suggested release timeline ==*
>>
>> I would propose to release around *end of July* (that is 8-9 weeks from
>> now).
>>
>> The rational behind that: There was a lot of effort in release testing
>> automation (end-to-end tests, scripted stress tests) as part of release
>> 1.5. You may have noticed the big set of new modules under
>> "flink-end-to-end-tests" in the Flink repository. It delayed the 1.5
>> release a bit, and needs to continue as part of the coming release cycle,
>> but should help make releasing more lightweight from now on.
>>
>> (Side note: There are also some nightly stress tests that we created and
>> run at data Artisans, and where we are looking whether and in which way it
>> would make sense to contribute them to Flink.)
>>
>> *== Features and focus areas ==*
>>
>> We had a lot of big and heavy features in Flink 1.5, with FLIP-6, the new
>> network stack, recovery, SQL joins and client, ... Following something like
>> a "tick-tock-model", I would suggest to focus the next release more on
>> integrations, tooling, and reducing user friction.
>>
>> Of course, this does not mean that no other pull request gets reviewed,
>> an no other topic will be examined - it is simply meant as a help to
>> understand where to expect more activity during the next release cycle.
>> Note that these are really the coarse focus areas - don't read this as a
>> comprehensive list.
>>
>> This list is my first suggestion, based on discussions with committers,
>> users, and mailing list questions.
>>
>>   - Support Java 9 and Scala 2.12
>>
>>   - Smoothen the integration in Container environment, like "Flink as a
>> Library", and easier integration with Kubernetes services and other proxies.
>>
>>   - Polish the remaing parts of the FLIP-6 rewrite
>>
>>   - Improve state backends with asynchronous timer snapshots, efficient
>> timer deletes, state TTL, and broadcast state support in RocksDB.
>>
>>   - Extends Streaming Sinks:
>>  - Bucketing Sink should support S3 properly (compensate for eventual
>> consistency), work with Flink's shaded S3 file systems, and efficiently
>> support formats that compress/index arcoss individual rows (Parquet, ORC,
>> ...)
>>  - Support ElasticSearch's new REST API
>>
>>   - Smoothen State Evolution to support type conversion on snapshot
>> restore
>>
>>   - Enhance Stream SQL and CEP
>>  - Add support for "update by key" Table Sources
>>  - Add more table sources and sinks (Kafka, Kinesis, Files, K/V
>> stores)
>>  - Expand SQL client
>>  - Integrate CEP and SQL, through MATCH_RECOGNIZE clause
>>  - Improve CEP Performance of SharedBuffer on RocksDB
>>
>>
>>
>>
>


Re: Implementing a “join” between a DataStream and a “set of rules”

2018-06-05 Thread Sandybayev, Turar (CAI - Atlanta)
Hi Amit,

In my current approach the idea for updating rule set data was to have some 
kind of a "control" stream that will trigger an update to a local data 
structure, or a "control" event within the main data stream that will trigger 
the same.

Using external system like a cache or database is also an option, but that 
still will require some kind of a trigger to reload rule set or a single rule, 
in case of any updates to it.

Others have suggested using Flink managed state, but I'm still not sure whether 
that is a generally recommended approach in this scenario, as it seems like it 
was more meant for windowing-type processing instead?

Thanks,
Turar

On 6/5/18, 8:46 AM, "Amit Jain"  wrote:

Hi Sandybayev,

In the current state, Flink does not provide a solution to the
mentioned use case. However, there is open FLIP[1] [2] which has been
created to address the same.

I can see in your current approach, you are not able to update the
rule set data. I think you can update rule set data by building
DataStream around changelogs which are stored in message
queue/distributed file system.
OR
You can store rule set data in the external system where you can query
for incoming keys from Flink.

[1]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
[2]: https://issues.apache.org/jira/browse/FLINK-6131

On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta)
 wrote:
> Hi,
>
>
>
> What is the best practice recommendation for the following use case? We 
need
> to match a stream against a set of “rules”, which are essentially a Flink
> DataSet concept. Updates to this “rules set" are possible but not 
frequent.
> Each stream event must be checked against all the records in “rules set”,
> and each match produces one or more events into a sink. Number of records 
in
> a rule set are in the 6 digit range.
>
>
>
> Currently we're simply loading rules into a local List of rules and using
> flatMap over an incoming DataStream. Inside flatMap, we're just iterating
> over a list comparing each event to each rule.
>
>
>
> To speed up the iteration, we can also split the list into several 
batches,
> essentially creating a list of lists, and creating a separate thread to
> iterate over each sub-list (using Futures in either Java or Scala).
>
>
>
> Questions:
>
> 1.Is there a better way to do this kind of a join?
>
> 2.If not, is it safe to add additional parallelism by creating
> new threads inside each flatMap operation, on top of what Flink is already
> doing?
>
>
>
> Thanks in advance!
>
> Turar
>
>




Re: Implementing a “join” between a DataStream and a “set of rules”

2018-06-05 Thread Sandybayev, Turar (CAI - Atlanta)
Hi Aljoscha,

Thank you, this seems like a match for this use case. Am I understanding 
correctly that since only MemoryStateBackend is available for broadcast state, 
the max amount possible is 5MB?

If I use Flink state mechanism for storing rules, I will still need to iterate 
through all rules inside of a flatMap, and there’s no higher-level join 
mechanism that I can employ, right? Is there any downside in trying to 
parallelize that iteration inside my user flatMap operation?

Thanks
Turar

From: Aljoscha Krettek 
Date: Tuesday, June 5, 2018 at 12:05 PM
To: Amit Jain 
Cc: "Sandybayev, Turar (CAI - Atlanta)" , 
"user@flink.apache.org" 
Subject: Re: Implementing a “join” between a DataStream and a “set of rules”

Hi,

you might be interested in this newly-introduced feature: 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/broadcast_state.html

Best,
Aljoscha


On 5. Jun 2018, at 14:46, Amit Jain 
mailto:aj201...@gmail.com>> wrote:

Hi Sandybayev,

In the current state, Flink does not provide a solution to the
mentioned use case. However, there is open FLIP[1] [2] which has been
created to address the same.

I can see in your current approach, you are not able to update the
rule set data. I think you can update rule set data by building
DataStream around changelogs which are stored in message
queue/distributed file system.
OR
You can store rule set data in the external system where you can query
for incoming keys from Flink.

[1]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
[2]: https://issues.apache.org/jira/browse/FLINK-6131

On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta)
mailto:turar.sandyba...@coxautoinc.com>> wrote:

Hi,



What is the best practice recommendation for the following use case? We need
to match a stream against a set of “rules”, which are essentially a Flink
DataSet concept. Updates to this “rules set" are possible but not frequent.
Each stream event must be checked against all the records in “rules set”,
and each match produces one or more events into a sink. Number of records in
a rule set are in the 6 digit range.



Currently we're simply loading rules into a local List of rules and using
flatMap over an incoming DataStream. Inside flatMap, we're just iterating
over a list comparing each event to each rule.



To speed up the iteration, we can also split the list into several batches,
essentially creating a list of lists, and creating a separate thread to
iterate over each sub-list (using Futures in either Java or Scala).



Questions:

1.Is there a better way to do this kind of a join?

2.If not, is it safe to add additional parallelism by creating
new threads inside each flatMap operation, on top of what Flink is already
doing?



Thanks in advance!

Turar




Re: Implementing a “join” between a DataStream and a “set of rules”

2018-06-05 Thread Sandybayev, Turar (CAI - Atlanta)
Thanks Garvit for your suggestion!

From: Garvit Sharma 
Date: Tuesday, June 5, 2018 at 8:44 AM
To: "Sandybayev, Turar (CAI - Atlanta)" 
Cc: "user@flink.apache.org" 
Subject: Re: Implementing a “join” between a DataStream and a “set of rules”

Hi,

For the above use case, you should do the following :

1. Convert your DataStream into KeyedDataStream by defining a key which would 
be used to get validated against your rules.
2. Same as 1 for rules stream.
3. Join the two keyedStreams using Flink's connect operator.
4. Store the rules into Flink's internal state i,e. Flink's managed keyed state.
5. Validate the data coming in the dataStream against the managed keyed state.

Refer to [1] [2] for more details.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/state.html



On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta) 
mailto:turar.sandyba...@coxautoinc.com>> wrote:
Hi,

What is the best practice recommendation for the following use case? We need to 
match a stream against a set of “rules”, which are essentially a Flink DataSet 
concept. Updates to this “rules set" are possible but not frequent. Each stream 
event must be checked against all the records in “rules set”, and each match 
produces one or more events into a sink. Number of records in a rule set are in 
the 6 digit range.

Currently we're simply loading rules into a local List of rules and using 
flatMap over an incoming DataStream. Inside flatMap, we're just iterating over 
a list comparing each event to each rule.

To speed up the iteration, we can also split the list into several batches, 
essentially creating a list of lists, and creating a separate thread to iterate 
over each sub-list (using Futures in either Java or Scala).

Questions:
1.Is there a better way to do this kind of a join?
2.If not, is it safe to add additional parallelism by creating new 
threads inside each flatMap operation, on top of what Flink is already doing?

Thanks in advance!
Turar




--

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination that 
makes him master.


IoT Use Case, Problem and Thoughts

2018-06-05 Thread ashish pok
Fabian, Stephan, All,
I started a discussion a while back around having a form of event-based 
checkpointing policy that will help us in some of our high volume data 
pipelines. Here is an effort to put this in front of community and understand 
what capabilities can support these type of use cases, how much others feel the 
same need and potentially a feature that can make it to a user story.
Use Case Summary:- Extremely high volume of data (events from consumer devices 
with customer base of over 100M)- Multiple events need to be combined using a 
windowing streaming app grouped by keys (something like 5 min floor of 
timestamp and unique identifiers for customer devices)- "Most" events by a 
group/key arrive in few seconds if not milliseconds however events can 
sometimes delay or get lost in transport (so delayed event handling and 
timeouts will be needed)- Extremely low (pretty vague but hopefully details 
below clarify it more) data loss is acceptable- Because of the volume and 
transient nature of source, checkpointing is turned off (saves on writes to 
persistence as states/sessions are active for only few seconds during 
processing)
Problem Summary:Of course, none of the above is out of the norm for Flink and 
as a matter of factor we already have a Flink app doing this. The issue arises 
when it comes to graceful shutdowns and on operator failures (eg: Kafka 
timeouts etc.) On operator failures, entire job graph restarts which 
essentially flushes out in-memory states/sessions. I think there is a feature 
in works (not sure if it made it to 1.5) to perform selective restarts which 
will control the damage but still will result in data loss. Also, it doesn't 
help when application restarts are needed. We did try going savepoint route for 
explicit restart needs but I think MemoryBackedState ran into issues for larger 
states or something along those line(not certain). We obviously cannot recover 
an operator that actually fails because it's own state could be unrecoverable. 
However, it feels like Flink already has a lot of plumbing to help with overall 
problem of allowing some sort of recoverable state to handle graceful shutdowns 
and restarts with minimal data loss.
Solutions:Some in community commented on my last email with decent ideas like 
having an event-based checkpointing trigger (on shutdown, on restart etc) or 
life-cycle hooks (onCancel, onRestart etc) in Functions that can be implemented 
if this type of behavior is needed etc. 
Appreciate feedback from community on how useful this might be for others and 
from core contributors on their thoughts as well.
Thanks in advance, Ashish


Re: Do I still need hadoop-aws libs when using Flink 1.5 and Presto?

2018-06-05 Thread Hao Sun
After I added these to my flink-conf.yml, everything works now.

s3.sse.enabled: true
s3.sse.type: S3

Thanks for the help!
In general I also want to know what config keys for presto-s3 I can use.


On Tue, Jun 5, 2018 at 11:43 AM Hao Sun  wrote:

> also a follow up question. Can I use all properties here? Should I remove
> `hive.` for all the keys?
>
> https://prestodb.io/docs/current/connector/hive.html#hive-configuration-properties
>
> More specifically how I configure sse for s3?
>
> On Tue, Jun 5, 2018 at 11:33 AM Hao Sun  wrote:
>
>> I do not have the S3A lib requirement anymore, but I got a new error.
>>
>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>> Access Denied
>>
>> Here are more logs:
>> https://gist.github.com/zenhao/5f22ca084717bae3194555398dad332d
>>
>> Thanks
>>
>> On Tue, Jun 5, 2018 at 9:39 AM Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>>
>>> sorry, yes, you don't have to add any of the Hadoop dependencies.
>>> Everything that's needed comes in the presto s3 jar.
>>>
>>> You should use "s3:" as the prefix, the Presto S3 filesystem will not be
>>> used if you use s3a. And yes, you add config values to the flink config as
>>> s3.xxx.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 5. Jun 2018, at 18:23, Hao Sun  wrote:
>>>
>>> Thanks for pick up my question. I had s3a in the config now I removed it.
>>> I will post a full trace soon, but want to get some questions answered
>>> to help me understand this better.
>>>
>>> 1. Can I use the presto lib with Flink 1.5 without bundled hdp? Can I
>>> use this?
>>> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.0/flink-1.5.0-bin-scala_2.11.tgz
>>> 
>>> 2. How do I configure presto for endpoints, encryption? The S3A file
>>> system needed core-site.yml to configure such things and S3 V4 signature.
>>> Do I have to do it for presto?
>>> 3. If yes, how to do it? Just add s3.xxx to flink-config?
>>> like s3.server-side-encryption-algorithm: AES256
>>> s3.endpoint: 's3.amazonaws.com
>>> ' other
>>> values for France regions, etc
>>>
>>> I will post more logs when I get one. Thanks
>>>
>>> On Tue, Jun 5, 2018 at 9:09 AM Aljoscha Krettek 
>>> wrote:
>>>
 Hi,

 what are you using as the FileSystem scheme? s3 or s3a?

 Also, could you also post the full stack trace, please?

 Best,
 Aljoscha


 On 2. Jun 2018, at 07:34, Hao Sun  wrote:

 I am trying to figure out how to use S3 as state storage.
 The recommended way is
 https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
 

 Seems like I only have to do two things:
 *1. Put flink-s3-fs-presto to the lib*
 *2. Configure *

 s3.access-key: your-access-keys3.secret-key: your-secret-key


 But I see this exception: ClassNotFoundException:
 NativeS3FileSystem/S3AFileSystem Not Found


 https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency
 

 Add it is suggested to add more libs.
 So I am confused here, is there a step 3 needed? Isn't the presto jar
 is all self contained?

 Thanks



>>>


Re: Do I still need hadoop-aws libs when using Flink 1.5 and Presto?

2018-06-05 Thread Hao Sun
also a follow up question. Can I use all properties here? Should I remove
`hive.` for all the keys?
https://prestodb.io/docs/current/connector/hive.html#hive-configuration-properties

More specifically how I configure sse for s3?

On Tue, Jun 5, 2018 at 11:33 AM Hao Sun  wrote:

> I do not have the S3A lib requirement anymore, but I got a new error.
>
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> Access Denied
>
> Here are more logs:
> https://gist.github.com/zenhao/5f22ca084717bae3194555398dad332d
>
> Thanks
>
> On Tue, Jun 5, 2018 at 9:39 AM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> sorry, yes, you don't have to add any of the Hadoop dependencies.
>> Everything that's needed comes in the presto s3 jar.
>>
>> You should use "s3:" as the prefix, the Presto S3 filesystem will not be
>> used if you use s3a. And yes, you add config values to the flink config as
>> s3.xxx.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 5. Jun 2018, at 18:23, Hao Sun  wrote:
>>
>> Thanks for pick up my question. I had s3a in the config now I removed it.
>> I will post a full trace soon, but want to get some questions answered to
>> help me understand this better.
>>
>> 1. Can I use the presto lib with Flink 1.5 without bundled hdp? Can I use
>> this?
>> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.0/flink-1.5.0-bin-scala_2.11.tgz
>> 
>> 2. How do I configure presto for endpoints, encryption? The S3A file
>> system needed core-site.yml to configure such things and S3 V4 signature.
>> Do I have to do it for presto?
>> 3. If yes, how to do it? Just add s3.xxx to flink-config?
>> like s3.server-side-encryption-algorithm: AES256
>> s3.endpoint: 's3.amazonaws.com
>> ' other
>> values for France regions, etc
>>
>> I will post more logs when I get one. Thanks
>>
>> On Tue, Jun 5, 2018 at 9:09 AM Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>>
>>> what are you using as the FileSystem scheme? s3 or s3a?
>>>
>>> Also, could you also post the full stack trace, please?
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 2. Jun 2018, at 07:34, Hao Sun  wrote:
>>>
>>> I am trying to figure out how to use S3 as state storage.
>>> The recommended way is
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
>>> 
>>>
>>> Seems like I only have to do two things:
>>> *1. Put flink-s3-fs-presto to the lib*
>>> *2. Configure *
>>>
>>> s3.access-key: your-access-keys3.secret-key: your-secret-key
>>>
>>>
>>> But I see this exception: ClassNotFoundException:
>>> NativeS3FileSystem/S3AFileSystem Not Found
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency
>>> 
>>>
>>> Add it is suggested to add more libs.
>>> So I am confused here, is there a step 3 needed? Isn't the presto jar is
>>> all self contained?
>>>
>>> Thanks
>>>
>>>
>>>
>>


Re: Do I still need hadoop-aws libs when using Flink 1.5 and Presto?

2018-06-05 Thread Hao Sun
I do not have the S3A lib requirement anymore, but I got a new error.

org.apache.flink.fs.s3presto.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
Access Denied

Here are more logs:
https://gist.github.com/zenhao/5f22ca084717bae3194555398dad332d

Thanks

On Tue, Jun 5, 2018 at 9:39 AM Aljoscha Krettek  wrote:

> Hi,
>
> sorry, yes, you don't have to add any of the Hadoop dependencies.
> Everything that's needed comes in the presto s3 jar.
>
> You should use "s3:" as the prefix, the Presto S3 filesystem will not be
> used if you use s3a. And yes, you add config values to the flink config as
> s3.xxx.
>
> Best,
> Aljoscha
>
>
> On 5. Jun 2018, at 18:23, Hao Sun  wrote:
>
> Thanks for pick up my question. I had s3a in the config now I removed it.
> I will post a full trace soon, but want to get some questions answered to
> help me understand this better.
>
> 1. Can I use the presto lib with Flink 1.5 without bundled hdp? Can I use
> this?
> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.0/flink-1.5.0-bin-scala_2.11.tgz
> 
> 2. How do I configure presto for endpoints, encryption? The S3A file
> system needed core-site.yml to configure such things and S3 V4 signature.
> Do I have to do it for presto?
> 3. If yes, how to do it? Just add s3.xxx to flink-config?
> like s3.server-side-encryption-algorithm: AES256
> s3.endpoint: 's3.amazonaws.com
> ' other values
> for France regions, etc
>
> I will post more logs when I get one. Thanks
>
> On Tue, Jun 5, 2018 at 9:09 AM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> what are you using as the FileSystem scheme? s3 or s3a?
>>
>> Also, could you also post the full stack trace, please?
>>
>> Best,
>> Aljoscha
>>
>>
>> On 2. Jun 2018, at 07:34, Hao Sun  wrote:
>>
>> I am trying to figure out how to use S3 as state storage.
>> The recommended way is
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
>> 
>>
>> Seems like I only have to do two things:
>> *1. Put flink-s3-fs-presto to the lib*
>> *2. Configure *
>>
>> s3.access-key: your-access-keys3.secret-key: your-secret-key
>>
>>
>> But I see this exception: ClassNotFoundException:
>> NativeS3FileSystem/S3AFileSystem Not Found
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency
>> 
>>
>> Add it is suggested to add more libs.
>> So I am confused here, is there a step 3 needed? Isn't the presto jar is
>> all self contained?
>>
>> Thanks
>>
>>
>>
>


Re: Do I still need hadoop-aws libs when using Flink 1.5 and Presto?

2018-06-05 Thread Aljoscha Krettek
Hi,

sorry, yes, you don't have to add any of the Hadoop dependencies. Everything 
that's needed comes in the presto s3 jar.

You should use "s3:" as the prefix, the Presto S3 filesystem will not be used 
if you use s3a. And yes, you add config values to the flink config as s3.xxx.

Best,
Aljoscha

> On 5. Jun 2018, at 18:23, Hao Sun  wrote:
> 
> Thanks for pick up my question. I had s3a in the config now I removed it.
> I will post a full trace soon, but want to get some questions answered to 
> help me understand this better.
> 
> 1. Can I use the presto lib with Flink 1.5 without bundled hdp? Can I use 
> this? 
> http://www.apache.org/dyn/closer.lua/flink/flink-1.5.0/flink-1.5.0-bin-scala_2.11.tgz
>  
> 
> 2. How do I configure presto for endpoints, encryption? The S3A file system 
> needed core-site.yml to configure such things and S3 V4 signature. Do I have 
> to do it for presto?
> 3. If yes, how to do it? Just add s3.xxx to flink-config? 
> like s3.server-side-encryption-algorithm: AES256
> s3.endpoint: 's3.amazonaws.com ' other values 
> for France regions, etc
> 
> I will post more logs when I get one. Thanks
> 
> On Tue, Jun 5, 2018 at 9:09 AM Aljoscha Krettek  > wrote:
> Hi,
> 
> what are you using as the FileSystem scheme? s3 or s3a?
> 
> Also, could you also post the full stack trace, please?
> 
> Best,
> Aljoscha
> 
> 
>> On 2. Jun 2018, at 07:34, Hao Sun > > wrote:
>> 
>> I am trying to figure out how to use S3 as state storage.
>> The recommended way is 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
>>  
>> 
>> 
>> Seems like I only have to do two things:
>> 1. Put flink-s3-fs-presto to the lib
>> 2. Configure
>> s3.access-key: your-access-key
>> s3.secret-key: your-secret-key
>> 
>> But I see this exception: ClassNotFoundException: 
>> NativeS3FileSystem/S3AFileSystem Not Found
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency
>>  
>> 
>> 
>> Add it is suggested to add more libs.
>> So I am confused here, is there a step 3 needed? Isn't the presto jar is all 
>> self contained?
>> 
>> Thanks
> 



Re: Do I still need hadoop-aws libs when using Flink 1.5 and Presto?

2018-06-05 Thread Hao Sun
Thanks for pick up my question. I had s3a in the config now I removed it.
I will post a full trace soon, but want to get some questions answered to
help me understand this better.

1. Can I use the presto lib with Flink 1.5 without bundled hdp? Can I use
this?
http://www.apache.org/dyn/closer.lua/flink/flink-1.5.0/flink-1.5.0-bin-scala_2.11.tgz
2. How do I configure presto for endpoints, encryption? The S3A file system
needed core-site.yml to configure such things and S3 V4 signature. Do I
have to do it for presto?
3. If yes, how to do it? Just add s3.xxx to flink-config?
like s3.server-side-encryption-algorithm: AES256
s3.endpoint: 's3.amazonaws.com' other values for France regions, etc

I will post more logs when I get one. Thanks

On Tue, Jun 5, 2018 at 9:09 AM Aljoscha Krettek  wrote:

> Hi,
>
> what are you using as the FileSystem scheme? s3 or s3a?
>
> Also, could you also post the full stack trace, please?
>
> Best,
> Aljoscha
>
>
> On 2. Jun 2018, at 07:34, Hao Sun  wrote:
>
> I am trying to figure out how to use S3 as state storage.
> The recommended way is
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
> 
>
> Seems like I only have to do two things:
> *1. Put flink-s3-fs-presto to the lib*
> *2. Configure *
>
> s3.access-key: your-access-keys3.secret-key: your-secret-key
>
>
> But I see this exception: ClassNotFoundException:
> NativeS3FileSystem/S3AFileSystem Not Found
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency
> 
>
> Add it is suggested to add more libs.
> So I am confused here, is there a step 3 needed? Isn't the presto jar is
> all self contained?
>
> Thanks
>
>
>


Re: Do I still need hadoop-aws libs when using Flink 1.5 and Presto?

2018-06-05 Thread Aljoscha Krettek
Hi,

what are you using as the FileSystem scheme? s3 or s3a?

Also, could you also post the full stack trace, please?

Best,
Aljoscha

> On 2. Jun 2018, at 07:34, Hao Sun  wrote:
> 
> I am trying to figure out how to use S3 as state storage.
> The recommended way is 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended
>  
> 
> 
> Seems like I only have to do two things:
> 1. Put flink-s3-fs-presto to the lib
> 2. Configure
> s3.access-key: your-access-key
> s3.secret-key: your-secret-key
> 
> But I see this exception: ClassNotFoundException: 
> NativeS3FileSystem/S3AFileSystem Not Found
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#provide-s3-filesystem-dependency
>  
> 
> 
> Add it is suggested to add more libs.
> So I am confused here, is there a step 3 needed? Isn't the presto jar is all 
> self contained?
> 
> Thanks



Re: Implementing a “join” between a DataStream and a “set of rules”

2018-06-05 Thread Aljoscha Krettek
Hi,

you might be interested in this newly-introduced feature: 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/broadcast_state.html
 


Best,
Aljoscha

> On 5. Jun 2018, at 14:46, Amit Jain  wrote:
> 
> Hi Sandybayev,
> 
> In the current state, Flink does not provide a solution to the
> mentioned use case. However, there is open FLIP[1] [2] which has been
> created to address the same.
> 
> I can see in your current approach, you are not able to update the
> rule set data. I think you can update rule set data by building
> DataStream around changelogs which are stored in message
> queue/distributed file system.
> OR
> You can store rule set data in the external system where you can query
> for incoming keys from Flink.
> 
> [1]: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
> [2]: https://issues.apache.org/jira/browse/FLINK-6131
> 
> On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta)
>  wrote:
>> Hi,
>> 
>> 
>> 
>> What is the best practice recommendation for the following use case? We need
>> to match a stream against a set of “rules”, which are essentially a Flink
>> DataSet concept. Updates to this “rules set" are possible but not frequent.
>> Each stream event must be checked against all the records in “rules set”,
>> and each match produces one or more events into a sink. Number of records in
>> a rule set are in the 6 digit range.
>> 
>> 
>> 
>> Currently we're simply loading rules into a local List of rules and using
>> flatMap over an incoming DataStream. Inside flatMap, we're just iterating
>> over a list comparing each event to each rule.
>> 
>> 
>> 
>> To speed up the iteration, we can also split the list into several batches,
>> essentially creating a list of lists, and creating a separate thread to
>> iterate over each sub-list (using Futures in either Java or Scala).
>> 
>> 
>> 
>> Questions:
>> 
>> 1.Is there a better way to do this kind of a join?
>> 
>> 2.If not, is it safe to add additional parallelism by creating
>> new threads inside each flatMap operation, on top of what Flink is already
>> doing?
>> 
>> 
>> 
>> Thanks in advance!
>> 
>> Turar
>> 
>> 



Re: Cannot determine simple type name - [FLINK-7490]

2018-06-05 Thread rakeshchalasani
Thanks for the response, the issue does look similar to FLINK-9220.

The code is part of our application, so I have to come up with an example.
But following are the steps that will likely reproduce the issue.

1. Define UDF in module, say in com.udf
2. Create a topology using the above UDF in another module, say in org.test,
and start execution in org.test.

That should reproduce the issue. 

Please also note that I have only tested this using Local environment. 

May be diverging from this issue, but to work around this, I refactored my
code to put the UDF in the same module but still have some reference to
another module. This didn't work locally either, but worked on the cluster.
Looking at the code, user class loader is using the path from the jar as
well, which is able to resolve package on the cluster, but locally it is not
able to get those class paths.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: PartitionNotFoundException after deployment

2018-06-05 Thread Nico Kruber
Hi Gyula,
as a follow-up, you may be interested in
https://issues.apache.org/jira/browse/FLINK-9413


Nico

On 04/05/18 15:36, Gyula Fóra wrote:
> Looks pretty clear that one operator takes too long to start (even on
> the UI it shows it in the created state for far too long). Any idea what
> might cause this delay? It actually often crashes on Akka ask timeout
> during scheduling the node.
> 
> Gyula
> 
> Piotr Nowojski  > ezt írta (időpont: 2018. máj. 4., P,
> 15:33):
> 
> Ufuk: I don’t know why.
> 
> +1 for your other suggestions.
> 
> Piotrek
> 
> > On 4 May 2018, at 14:52, Ufuk Celebi  > wrote:
> >
> > Hey Gyula!
> >
> > I'm including Piotr and Nico (cc'd) who have worked on the network
> > stack in the last releases.
> >
> > Registering the network structures including the intermediate results
> > actually happens **before** any state is restored. I'm not sure why
> > this reproducibly happens when you restore state. @Nico, Piotr: any
> > ideas here?
> >
> > In general I think what happens here is the following:
> > - a task requests the result of a local upstream producer, but that
> > one has not registered its intermediate result yet
> > - this should result in a retry of the request with some backoff
> > (controlled via the config params you mention
> > taskmanager.network.request-backoff.max,
> > taskmanager.network.request-backoff.initial)
> >
> > As a first step I would set logging to DEBUG and check the TM logs for
> > messages like "Retriggering partition request {}:{}."
> >
> > You can also check the SingleInputGate code which has the logic for
> > retriggering requests.
> >
> > – Ufuk
> >
> >
> > On Fri, May 4, 2018 at 10:27 AM, Gyula Fóra  > wrote:
> >> Hi Ufuk,
> >>
> >> Do you have any quick idea what could cause this problems in
> flink 1.4.2?
> >> Seems like one operator takes too long to deploy and downstream
> tasks error
> >> out on partition not found. This only seems to happen when the job is
> >> restored from state and in fact that operator has some keyed and
> operator
> >> state as well.
> >>
> >> Deploying the same job from empty state works well. We tried
> increasing the
> >> taskmanager.network.request-backoff.max that didnt help.
> >>
> >> It would be great if you have some pointers where to look
> further, I havent
> >> seen this happening before.
> >>
> >> Thank you!
> >> Gyula
> >>
> >> The errror:
> >> org.apache.flink.runtime.io
> .network.partition.: Partition
> >> 4c5e9cd5dd410331103f51127996068a@b35ef4ffe25e3d17c5d6051ebe2860cd
> not found.
> >>    at
> >> org.apache.flink.runtime.io
> 
> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)
> >>    at
> >> org.apache.flink.runtime.io
> 
> .network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:115)
> >>    at
> >> org.apache.flink.runtime.io
> 
> .network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:159)
> >>    at java.util.TimerThread.mainLoop(Timer.java:555)
> >>    at java.util.TimerThread.run(Timer.java:505)
> >
> >
> >
> > --
> > Data Artisans GmbH | Stresemannstr. 121a | 10963 Berlin
> >
> > i...@data-artisans.com 
> > +49-30-43208879 
> >
> > Registered at Amtsgericht Charlottenburg - HRB 158244 B
> > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen



signature.asc
Description: OpenPGP digital signature


Re: Implementing a “join” between a DataStream and a “set of rules”

2018-06-05 Thread Amit Jain
Hi Sandybayev,

In the current state, Flink does not provide a solution to the
mentioned use case. However, there is open FLIP[1] [2] which has been
created to address the same.

I can see in your current approach, you are not able to update the
rule set data. I think you can update rule set data by building
DataStream around changelogs which are stored in message
queue/distributed file system.
OR
You can store rule set data in the external system where you can query
for incoming keys from Flink.

[1]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
[2]: https://issues.apache.org/jira/browse/FLINK-6131

On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta)
 wrote:
> Hi,
>
>
>
> What is the best practice recommendation for the following use case? We need
> to match a stream against a set of “rules”, which are essentially a Flink
> DataSet concept. Updates to this “rules set" are possible but not frequent.
> Each stream event must be checked against all the records in “rules set”,
> and each match produces one or more events into a sink. Number of records in
> a rule set are in the 6 digit range.
>
>
>
> Currently we're simply loading rules into a local List of rules and using
> flatMap over an incoming DataStream. Inside flatMap, we're just iterating
> over a list comparing each event to each rule.
>
>
>
> To speed up the iteration, we can also split the list into several batches,
> essentially creating a list of lists, and creating a separate thread to
> iterate over each sub-list (using Futures in either Java or Scala).
>
>
>
> Questions:
>
> 1.Is there a better way to do this kind of a join?
>
> 2.If not, is it safe to add additional parallelism by creating
> new threads inside each flatMap operation, on top of what Flink is already
> doing?
>
>
>
> Thanks in advance!
>
> Turar
>
>


Re: Implementing a “join” between a DataStream and a “set of rules”

2018-06-05 Thread Garvit Sharma
Hi,

For the above use case, you should do the following :

1. Convert your DataStream into KeyedDataStream by defining a key which
would be used to get validated against your rules.
2. Same as 1 for rules stream.
3. Join the two keyedStreams using Flink's connect operator.
4. Store the rules into Flink's internal state i,e. Flink's managed keyed
state.
5. Validate the data coming in the dataStream against the managed keyed
state.

Refer to [1] [2] for more details.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/
operators/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/state.html



On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta) <
turar.sandyba...@coxautoinc.com> wrote:

> Hi,
>
>
>
> What is the best practice recommendation for the following use case? We
> need to match a stream against a set of “rules”, which are essentially a
> Flink DataSet concept. Updates to this “rules set" are possible but not
> frequent. Each stream event must be checked against all the records in
> “rules set”, and each match produces one or more events into a sink. Number
> of records in a rule set are in the 6 digit range.
>
>
>
> Currently we're simply loading rules into a local List of rules and using
> flatMap over an incoming DataStream. Inside flatMap, we're just iterating
> over a list comparing each event to each rule.
>
>
>
> To speed up the iteration, we can also split the list into several
> batches, essentially creating a list of lists, and creating a separate
> thread to iterate over each sub-list (using Futures in either Java or
> Scala).
>
>
>
> Questions:
>
> 1.Is there a better way to do this kind of a join?
>
> 2.If not, is it safe to add additional parallelism by creating
> new threads inside each flatMap operation, on top of what Flink is already
> doing?
>
>
>
> Thanks in advance!
>
> Turar
>
>
>



-- 

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination
that makes him master.


Implementing a “join” between a DataStream and a “set of rules”

2018-06-05 Thread Sandybayev, Turar (CAI - Atlanta)
Hi,

What is the best practice recommendation for the following use case? We need to 
match a stream against a set of “rules”, which are essentially a Flink DataSet 
concept. Updates to this “rules set" are possible but not frequent. Each stream 
event must be checked against all the records in “rules set”, and each match 
produces one or more events into a sink. Number of records in a rule set are in 
the 6 digit range.

Currently we're simply loading rules into a local List of rules and using 
flatMap over an incoming DataStream. Inside flatMap, we're just iterating over 
a list comparing each event to each rule.

To speed up the iteration, we can also split the list into several batches, 
essentially creating a list of lists, and creating a separate thread to iterate 
over each sub-list (using Futures in either Java or Scala).

Questions:
1.Is there a better way to do this kind of a join?
2.If not, is it safe to add additional parallelism by creating new 
threads inside each flatMap operation, on top of what Flink is already doing?

Thanks in advance!
Turar



Re: Checkpointing on cluster shutdown

2018-06-05 Thread Chesnay Schepler
If a TM goes down any data generated after the last successful 
checkpoint cannot be guaranteed to be consistent across the cluster.
Hence, this data is discarded and we go back to the last known 
consistent state, the last checkpoint that was successfully created.


On 05.06.2018 13:06, Garvit Sharma wrote:

But job should be terminated gracefully. Why is this behavior not there?

On Tue, Jun 5, 2018 at 4:19 PM, Chesnay Schepler > wrote:


No checkpoint will be triggered when the cluster is shutdown. For
this case you will have to manually trigger a savepoint.

If a TM goes down it does not create a checkpoint. IN these cases
the job will be restarted from the last successful checkpoint.


On 05.06.2018 12:01, Data Engineer wrote:

Hi,

Suppose I have a working Flink cluster with 1 taskmanager and
1 jobmanager and I have enabled checkpointing with say an
interval of 1 minute.
Now if I shut down the Flink cluster in between checkpoints
(say for some upgrade), will the JobManager automatically
trigger a checkpoint before going down?

Or is it mandatory to manually trigger savepoints in these cases?
Also am I correct in my understanding that if a taskmanager
goes down first, there is no way the TaskManager can trigger
the checkpoint on its own?






--

Garvit Sharma
github.com/garvitlnmiit/ 

No Body is a Scholar by birth, its only hard work and strong 
determination that makes him master.





Re: Link checkpoint failure issue

2018-06-05 Thread Chesnay Schepler

Can you provide us with the TaskManager logs?

On 05.06.2018 12:30, James (Jian Wu) [FDS Data Platform] wrote:


Hi:

  I am using Flink streaming continuous query.

  Scenario:

Kafka-connector to consume a topic, and streaming incremental 
calculate 24 hours window data. And use processingTime as 
TimeCharacteristic. I am using RocksDB as StateBackend, file system is 
HDFS, and checkpoint interval is 5 minutes.


env.setStreamTimeCharacteristic(TimeCharacteristic./ProcessingTime/);


RocksDBStateBackend rocksdb = new RocksDBStateBackend(checkPointPath, 
true);

rocksdb.setPredefinedOptions(PredefinedOptions./FLASH_SSD_OPTIMIZED/);

env.setStateBackend(rocksdb);


env.enableCheckpointing(checkPointInterval);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(checkPointInterval);

  After I run the application for serval hours, the info log shows

2018-06-04 19:29:08,048 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator   - 
Triggering checkpoint 9 @ 1528108147985 for job 
33313f186b439312bd09e5672e8af661.


 but not completed log, and the checkpoint failed

 Via web UI metrics, kafka commit offset stop increase, and kafka 
current offset still go ahead, wait for 2 hours, kafka stop consume 
message.


Then I enable debug log, and try to reproduce the issue,

*During normal stage, the log shows there are DFSClient send data package*

2018-06-04 19:23:58,933 DEBUG org.apache.hadoop.hdfs.DFSClient 
- /flink/cps/33313f186b439312bd09e5672e8af661/chk-8: 
masked=rwxr-xr-x


2018-06-04 19:23:58,934 DEBUG org.apache.hadoop.ipc.Client 
- The ping interval is 6 ms.


2018-06-04 19:23:58,934 DEBUG org.apache.hadoop.ipc.Client 
- Connecting to fds-hadoop-prod30-mp/10.10.22.50:8020


2018-06-04 19:23:58,935 DEBUG org.apache.hadoop.ipc.Client 
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin: starting, having 
connections 1


2018-06-04 19:23:58,936 DEBUG org.apache.hadoop.ipc.Client 
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin sending #1709


2018-06-04 19:23:58,967 DEBUG org.apache.hadoop.ipc.Client 
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin got value #1709


2018-06-04 19:23:58,967 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine 
- Call: mkdirs took 33ms


2018-06-04 19:23:58,967 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator   - 
Triggering checkpoint 8 @ 1528107838933 for job 
33313f186b439312bd09e5672e8af661.


2018-06-04 19:24:00,054 DEBUG org.apache.hadoop.hdfs.DFSClient 
- 
/flink/cps/33313f186b439312bd09e5672e8af661/chk-8/_metadata: 
masked=rw-r--r--


2018-06-04 19:24:00,055 DEBUG org.apache.hadoop.ipc.Client 
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin sending #1710


2018-06-04 19:24:00,060 DEBUG org.apache.hadoop.ipc.Client 
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin got value #1710


2018-06-04 19:24:00,060 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine 
- Call: create took 6ms


2018-06-04 19:24:00,060 DEBUG org.apache.hadoop.hdfs.DFSClient 
- computePacketChunkSize: 
src=/flink/cps/33313f186b439312bd09e5672e8af661/chk-8/_metadata, 
chunkSize=516, chunksPerPacket=126, packetSize=65016


2018-06-04 19:24:00,061 DEBUG org.apache.hadoop.hdfs.LeaseRenewer 
- Lease renewer daemon for 
[DFSClient_NONMAPREDUCE_-866487647_111] with renew id 1 started


2018-06-04 19:24:00,061 DEBUG org.apache.hadoop.hdfs.DFSClient 
- DFSClient writeChunk allocating new packet seqno=0, 
src=/flink/cps/33313f186b439312bd09e5672e8af661/chk-8/_metadata, 
packetSize=65016, chunksPerPacket=126, bytesCurBlock=0


2018-06-04 19:24:00,061 DEBUG org.apache.hadoop.hdfs.DFSClient 
- DFSClient flush(): bytesCurBlock=6567 lastFlushOffset=0 
createNewBlock=false


2018-06-04 19:24:00,061 DEBUG org.apache.hadoop.hdfs.DFSClient 
- Queued packet 0


2018-06-04 19:24:00,061 DEBUG org.apache.hadoop.hdfs.DFSClient 
- Waiting for ack for: 0


2018-06-04 19:24:00,061 DEBUG org.apache.hadoop.hdfs.DFSClient 
- Allocating new block


2018-06-04 19:24:00,062 DEBUG org.apache.hadoop.ipc.Client 
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin sending #1711


2018-06-04 19:24:00,068 DEBUG org.apache.hadoop.ipc.Client 
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin got value #1711


2018-06-04 19:24:00,068 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine 
- Call: addBlock took 6ms



Re: Checkpointing on cluster shutdown

2018-06-05 Thread Garvit Sharma
But job should be terminated gracefully. Why is this behavior not there?

On Tue, Jun 5, 2018 at 4:19 PM, Chesnay Schepler  wrote:

> No checkpoint will be triggered when the cluster is shutdown. For this
> case you will have to manually trigger a savepoint.
>
> If a TM goes down it does not create a checkpoint. IN these cases the job
> will be restarted from the last successful checkpoint.
>
>
> On 05.06.2018 12:01, Data Engineer wrote:
>
>> Hi,
>>
>> Suppose I have a working Flink cluster with 1 taskmanager and 1
>> jobmanager and I have enabled checkpointing with say an interval of 1
>> minute.
>> Now if I shut down the Flink cluster in between checkpoints (say for some
>> upgrade), will the JobManager automatically trigger a checkpoint before
>> going down?
>>
>> Or is it mandatory to manually trigger savepoints in these cases?
>> Also am I correct in my understanding that if a taskmanager goes down
>> first, there is no way the TaskManager can trigger the checkpoint on its
>> own?
>>
>>
>>
>


-- 

Garvit Sharma
github.com/garvitlnmiit/

No Body is a Scholar by birth, its only hard work and strong determination
that makes him master.


Re: How to read from Cassandra using Apache Flink?

2018-06-05 Thread Chesnay Schepler
You are creating an entirely new sink for each fetch, which includes 
setting up a connection to cassandra. It is not surprising that this is 
slow.
The cassandra input format was written for reading large amounts of 
data, not synchronous single row fetches.


You can try using the datastax driver (or some other library) directly. 
You should be able to re-use large portions of the CassandraInputFormat.


Note that you're also leaking the connection as you aren't closing the 
InputFormat.


On 05.06.2018 12:02, HarshithBolar wrote:

My flink program should do a Cassandra look up for each input record and
based on the results, should do some further processing.

But I'm currently stuck at reading data from Cassandra. This is the code
snippet I've come up with so far.


ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
 @Override
 protected Cluster buildCluster(Cluster.Builder builder) {
 return
builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
 .withPort(props.getCassandraPort())
 .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
 .withQueryOptions(new
QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
.build();
}
};
for (int i=1; i<5; i++) {
CassandraInputFormat>
cassandraInputFormat =
new CassandraInputFormat<>("select * from test where
id=hello" + i, >secureCassandraSinkClusterBuilder);
cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);
Tuple2 out = new Tuple8<>();
cassandraInputFormat.nextRecord(out);
System.out.println(out);
}

But the issue with this is, it takes nearly 10 seconds for each look up, in
other words, this for loop takes 50 seconds to execute.

How do I speed up this operation? Alternatively, is there any other way of
looking up Cassandra in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Checkpointing on cluster shutdown

2018-06-05 Thread Chesnay Schepler
No checkpoint will be triggered when the cluster is shutdown. For this 
case you will have to manually trigger a savepoint.


If a TM goes down it does not create a checkpoint. IN these cases the 
job will be restarted from the last successful checkpoint.


On 05.06.2018 12:01, Data Engineer wrote:

Hi,

Suppose I have a working Flink cluster with 1 taskmanager and 1 
jobmanager and I have enabled checkpointing with say an interval of 1 
minute.
Now if I shut down the Flink cluster in between checkpoints (say for 
some upgrade), will the JobManager automatically trigger a checkpoint 
before going down?


Or is it mandatory to manually trigger savepoints in these cases?
Also am I correct in my understanding that if a taskmanager goes down 
first, there is no way the TaskManager can trigger the checkpoint on 
its own?







Re: is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?

2018-06-05 Thread Chesnay Schepler
Please look into high-availability 
 
to make your cluster resistant against shutdowns.


On 05.06.2018 12:31, makeyang wrote:

can anybody share anythoughts, insights about this issue?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Writing stream to Hadoop

2018-06-05 Thread miki haiat
OMG i missed it ...

Thanks,

MIki

On Tue, Jun 5, 2018 at 1:30 PM Chesnay Schepler  wrote:

> This particular version of the method is deprecated, use 
> enableCheckpointing(long
> checkpointingInterval) instead.
>
> On 05.06.2018 12:19, miki haiat wrote:
>
> I saw the option of enabling checkpoint
> enabling-and-configuring-checkpointing
> 
>
> But on 1.5 it said that the method is deprecated so im a bit confused .
>
> /** @deprecated */@Deprecated
> @PublicEvolvingpublic StreamExecutionEnvironment enableCheckpointing() {
> this.checkpointCfg.setCheckpointInterval(500L);return this;}
>
>
>
>
> On Tue, Jun 5, 2018 at 1:11 PM Kostas Kloudas 
> wrote:
>
>> Hi Miki,
>>
>> Have you enabled checkpointing?
>>
>> Kostas
>>
>> On Jun 5, 2018, at 11:14 AM, miki haiat  wrote:
>>
>> Im trying to write some data to Hadoop by using this code
>>
>> The state backend is set without time
>>
>> StateBackend sb = new 
>> FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints");env.setStateBackend(sb);
>>
>> BucketingSink> sink =
>> new 
>> BucketingSink<>("hdfs://:9000/mycity/raw");sink.setBucketer(new 
>> DateTimeBucketer("-MM-dd--HHmm"));sink.setInactiveBucketCheckInterval(12);sink.setInactiveBucketThreshold(12);
>>
>> the result is that all the files are stuck in* in.programs  *status and
>> not closed.
>> is it related to the state backend configuration.
>>
>> thanks,
>>
>> Miki
>>
>>
>>
>


Re: is there a config to ask taskmanager to keep retrying connect to jobmanager after Disassociated?

2018-06-05 Thread makeyang
can anybody share anythoughts, insights about this issue?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Writing stream to Hadoop

2018-06-05 Thread Chesnay Schepler
This particular version of the method is deprecated, use 
enableCheckpointing(long checkpointingInterval) instead.


On 05.06.2018 12:19, miki haiat wrote:
I saw the option of enabling checkpoint 
enabling-and-configuring-checkpointing 
 


But on 1.5 it said that the method is deprecated so im a bit confused .
/** @deprecated */ @Deprecated
@PublicEvolving
public StreamExecutionEnvironment enableCheckpointing() {
 this.checkpointCfg.setCheckpointInterval(500L); return this; }



On Tue, Jun 5, 2018 at 1:11 PM Kostas Kloudas 
mailto:k.klou...@data-artisans.com>> wrote:


Hi Miki,

Have you enabled checkpointing?

Kostas


On Jun 5, 2018, at 11:14 AM, miki haiat mailto:miko5...@gmail.com>> wrote:

Im trying to write some data to Hadoop by using this code

The state backend is set without time
StateBackend sb =new 
FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints"); 
env.setStateBackend(sb);
BucketingSink> sink =
 new BucketingSink<>("hdfs://:9000/mycity/raw"); sink.setBucketer(new 
DateTimeBucketer("-MM-dd--HHmm")); sink.setInactiveBucketCheckInterval(12); 
sink.setInactiveBucketThreshold(12);
the result is that all the files are stuck in*in.programs *status
and not closed.
is it related to the state backend configuration.

thanks,

Miki







Link checkpoint failure issue

2018-06-05 Thread James (Jian Wu) [FDS Data Platform]
Hi:

  I am using Flink streaming continuous query.
  Scenario:
  Kafka-connector to consume a topic, and streaming incremental calculate 24 
hours window data. And use processingTime as TimeCharacteristic. I am using 
RocksDB as StateBackend, file system is HDFS, and checkpoint interval is 5 
minutes.

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);


RocksDBStateBackend rocksdb = new RocksDBStateBackend(checkPointPath, true);
rocksdb.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);

env.setStateBackend(rocksdb);


env.enableCheckpointing(checkPointInterval);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(checkPointInterval);

  After I run the application for serval hours, the info log shows
2018-06-04 19:29:08,048 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 9 @ 1528108147985 for job 33313f186b439312bd09e5672e8af661.

 but not completed log, and the checkpoint failed
 Via web UI metrics, kafka commit offset stop increase, and kafka current 
offset still go ahead, wait for 2 hours, kafka stop consume message.

Then I enable debug log, and try to reproduce the issue,

During normal stage, the log shows there are DFSClient send data package


2018-06-04 19:23:58,933 DEBUG org.apache.hadoop.hdfs.DFSClient  
- /flink/cps/33313f186b439312bd09e5672e8af661/chk-8: 
masked=rwxr-xr-x
2018-06-04 19:23:58,934 DEBUG org.apache.hadoop.ipc.Client  
- The ping interval is 6 ms.
2018-06-04 19:23:58,934 DEBUG org.apache.hadoop.ipc.Client  
- Connecting to fds-hadoop-prod30-mp/10.10.22.50:8020
2018-06-04 19:23:58,935 DEBUG org.apache.hadoop.ipc.Client  
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin: starting, having 
connections 1
2018-06-04 19:23:58,936 DEBUG org.apache.hadoop.ipc.Client  
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin sending #1709
2018-06-04 19:23:58,967 DEBUG org.apache.hadoop.ipc.Client  
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin got value #1709
2018-06-04 19:23:58,967 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine   
- Call: mkdirs took 33ms
2018-06-04 19:23:58,967 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 8 @ 1528107838933 for job 33313f186b439312bd09e5672e8af661.
2018-06-04 19:24:00,054 DEBUG org.apache.hadoop.hdfs.DFSClient  
- /flink/cps/33313f186b439312bd09e5672e8af661/chk-8/_metadata: 
masked=rw-r--r--
2018-06-04 19:24:00,055 DEBUG org.apache.hadoop.ipc.Client  
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin sending #1710
2018-06-04 19:24:00,060 DEBUG org.apache.hadoop.ipc.Client  
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin got value #1710
2018-06-04 19:24:00,060 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine   
- Call: create took 6ms
2018-06-04 19:24:00,060 DEBUG org.apache.hadoop.hdfs.DFSClient  
- computePacketChunkSize: 
src=/flink/cps/33313f186b439312bd09e5672e8af661/chk-8/_metadata, chunkSize=516, 
chunksPerPacket=126, packetSize=65016
2018-06-04 19:24:00,061 DEBUG org.apache.hadoop.hdfs.LeaseRenewer   
- Lease renewer daemon for [DFSClient_NONMAPREDUCE_-866487647_111] 
with renew id 1 started
2018-06-04 19:24:00,061 DEBUG org.apache.hadoop.hdfs.DFSClient  
- DFSClient writeChunk allocating new packet seqno=0, 
src=/flink/cps/33313f186b439312bd09e5672e8af661/chk-8/_metadata, 
packetSize=65016, chunksPerPacket=126, bytesCurBlock=0
2018-06-04 19:24:00,061 DEBUG org.apache.hadoop.hdfs.DFSClient  
- DFSClient flush(): bytesCurBlock=6567 lastFlushOffset=0 
createNewBlock=false
2018-06-04 19:24:00,061 DEBUG org.apache.hadoop.hdfs.DFSClient  
- Queued packet 0
2018-06-04 19:24:00,061 DEBUG org.apache.hadoop.hdfs.DFSClient  
- Waiting for ack for: 0
2018-06-04 19:24:00,061 DEBUG org.apache.hadoop.hdfs.DFSClient  
- Allocating new block
2018-06-04 19:24:00,062 DEBUG org.apache.hadoop.ipc.Client  
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin sending #1711
2018-06-04 19:24:00,068 DEBUG org.apache.hadoop.ipc.Client  
- IPC Client (2045458324) connection to 
fds-hadoop-prod30-mp/10.10.22.50:8020 from fdsadmin got value #1711
2018-06-04 19:24:00,068 DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine   
   

Re: Writing stream to Hadoop

2018-06-05 Thread miki haiat
I saw the option of enabling checkpoint
enabling-and-configuring-checkpointing


But on 1.5 it said that the method is deprecated so im a bit confused .

/** @deprecated */
@Deprecated
@PublicEvolving
public StreamExecutionEnvironment enableCheckpointing() {
this.checkpointCfg.setCheckpointInterval(500L);
return this;
}




On Tue, Jun 5, 2018 at 1:11 PM Kostas Kloudas 
wrote:

> Hi Miki,
>
> Have you enabled checkpointing?
>
> Kostas
>
> On Jun 5, 2018, at 11:14 AM, miki haiat  wrote:
>
> Im trying to write some data to Hadoop by using this code
>
> The state backend is set without time
>
> StateBackend sb = new 
> FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints");
> env.setStateBackend(sb);
>
> BucketingSink> sink =
> new BucketingSink<>("hdfs://:9000/mycity/raw");
> sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
> sink.setInactiveBucketCheckInterval(12);
> sink.setInactiveBucketThreshold(12);
>
> the result is that all the files are stuck in* in.programs  *status and
> not closed.
> is it related to the state backend configuration.
>
> thanks,
>
> Miki
>
>
>


Re: Writing stream to Hadoop

2018-06-05 Thread Kostas Kloudas
Hi Miki,

Have you enabled checkpointing?

Kostas

> On Jun 5, 2018, at 11:14 AM, miki haiat  wrote:
> 
> Im trying to write some data to Hadoop by using this code 
> 
> The state backend is set without time
> StateBackend sb = new 
> FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints");
> env.setStateBackend(sb);
> BucketingSink> sink =
> new BucketingSink<>("hdfs://:9000/mycity/raw");
> sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
> sink.setInactiveBucketCheckInterval(12);
> sink.setInactiveBucketThreshold(12);
> the result is that all the files are stuck in in.programs  status and not 
> closed.
> is it related to the state backend configuration.
> 
> thanks,
> 
> Miki 
> 



How to read from Cassandra using Apache Flink?

2018-06-05 Thread HarshithBolar
My flink program should do a Cassandra look up for each input record and
based on the results, should do some further processing.

But I'm currently stuck at reading data from Cassandra. This is the code
snippet I've come up with so far.

> ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
> @Override
> protected Cluster buildCluster(Cluster.Builder builder) {
> return
> builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
> .withPort(props.getCassandraPort())
> .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
> .withQueryOptions(new 
> QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
>.build();
>}
>};

>for (int i=1; i<5; i++) {
>CassandraInputFormat>
> cassandraInputFormat =
>new CassandraInputFormat<>("select * from test where
> id=hello" + i, >secureCassandraSinkClusterBuilder);
>cassandraInputFormat.configure(null);
>cassandraInputFormat.open(null);
>Tuple2 out = new Tuple8<>();
>cassandraInputFormat.nextRecord(out);
>System.out.println(out);
>}
But the issue with this is, it takes nearly 10 seconds for each look up, in
other words, this for loop takes 50 seconds to execute.

How do I speed up this operation? Alternatively, is there any other way of
looking up Cassandra in Flink?





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Checkpointing on cluster shutdown

2018-06-05 Thread Data Engineer
Hi,

Suppose I have a working Flink cluster with 1 taskmanager and 1 jobmanager
and I have enabled checkpointing with say an interval of 1 minute.
Now if I shut down the Flink cluster in between checkpoints (say for some
upgrade), will the JobManager automatically trigger a checkpoint before
going down?

Or is it mandatory to manually trigger savepoints in these cases?
Also am I correct in my understanding that if a taskmanager goes down
first, there is no way the TaskManager can trigger the checkpoint on its
own?


Re: NPE in flink sql over-window

2018-06-05 Thread Fabian Hueske
Hi Yan,

Thanks for providing the logs and opening the JIRA issue!
Let's continue the discussion there.

Best, Fabian

2018-06-05 1:26 GMT+02:00 Yan Zhou [FDS Science] :

> Hi Fabian,
>
> I added some trace logs in ProcTimeBoundedRangeOver and think it should
> be a bug. The log should explain how cleanup_time_1 bypasses the 
> needToCleanupState
> check and causes NPE. A jira ticket [1] is created.
>
> Best
> Yan
>
>
> *[ts:1528149296456] [label:state_ttl_update] register for cleanup at
> 1528150096456(CLEANUP_TIME_1), because of Row:(orderId:001,userId:U123)*
> *[ts:1528149296456] [label:register_pt] register for process input at
> 1528149296457, because of Row:(orderId:001,userId:U123)*
> *[ts:1528149296458] [label:state_apply] ontimer at 1528149296457, apply
> Row:(orderId:001,userId:U123) to accumulator*
>
> *[ts:1528149885813] [label:state_ttl_update] register at
> 1528150685813(CLEANUP_TIME_2), because of Row:(orderId:002,userId:U123)*
> *[ts:1528149885813] [label:register_pt] register for process input at
> 1528149885814, because of Row:(orderId:002,userId:U123)*
> *[ts:1528149885814] [label:state_apply] ontimer at 1528149885814, apply
> Row:(orderId:002,userId:U123) to accumulator*
>
> *[ts:1528150096460] [label:NO_ELEMENTS_IN_STATE] ontimer at
> 1528150096456(CLEANUP_TIME_1), bypass needToCleanupState check, however
> rowMapState is {key:1528150096455, value:[]}*
>
> *[ts:1528150685815] [label:state_timeout] ontimer at
> 1528150685813(CLEANUP_TIME_2), clean/empty the rowMapState
> [{key:1528149885813, value:[Row:(orderId:002,userId:U123)]}]*
>
>
>
>
>
>
>
>
> [1] : https://issues.apache.org/jira/browse/FLINK-9524
>
>
> --
> *From:* Yan Zhou [FDS Science] 
> *Sent:* Monday, June 4, 2018 4:05 PM
> *To:* Fabian Hueske
>
> *Cc:* Dawid Wysakowicz; user
> *Subject:* Re: NPE in flink sql over-window
>
>
> Hi Fabian,
>
>
> Yes, the NPE cause the job failure and recovery( instead of being the
> result of a recovery).
>
> And yet, during the recovery, the same exceptions are thrown from same
> line.
>
>
> Best
>
> Yan
> --
> *From:* Fabian Hueske 
> *Sent:* Thursday, May 31, 2018 12:09:03 AM
> *To:* Yan Zhou [FDS Science]
> *Cc:* Dawid Wysakowicz; user
> *Subject:* Re: NPE in flink sql over-window
>
> Hi Yan,
>
> Thanks for the details and for digging into the issue.
> If I got it right, the NPE caused the job failure and recovery (instead of
> being the result of a recovery), correct?
>
> Best, Fabian
>
> 2018-05-31 7:00 GMT+02:00 Yan Zhou [FDS Science] :
>
> Thanks for the replay.
>
>
> Yes, it only happen if I config the idle state retention times. The error
> occurs the first time before the first recovery. I haven't run with
> proctime but rowtime in flink 1.4.x. I am not sure if it will cause
> problems with proctime in 1.4.x.
>
>
> I am adding some trace log for ProcTimeBoundedRangeOver. I will update
> with my test result and fire a JIRA after that.
>
>
> Best
>
> Yan
> --
> *From:* Fabian Hueske 
> *Sent:* Wednesday, May 30, 2018 1:43:01 AM
> *To:* Dawid Wysakowicz
> *Cc:* user
> *Subject:* Re: NPE in flink sql over-window
>
> Hi,
>
> Dawid's analysis is certainly correct, but looking at the code this should
> not happen.
>
> I have a few questions:
> - You said this only happens if you configure idle state retention times,
> right?
> - Does the error occur the first time without a previous recovery?
> - Did you run the same query on Flink 1.4.x without any problems?
>
> Thanks, Fabian
>
> 2018-05-30 9:25 GMT+02:00 Dawid Wysakowicz :
>
> Hi Yan,
>
>
> I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access a
> list of elements that was already cleared and does not check against null.
> Could you please file a JIRA for that?
>
>
> Best,
>
> Dawid
>
> On 30/05/18 08:27, Yan Zhou [FDS Science] wrote:
>
> I also get warnning that CodeCache is full around that time. It's printed
> by JVM and doesn't have timestamp. But I suspect that it's because so
> many failure recoveries from checkpoint and the sql queries are dynamically
> compiled too many times.
>
>
>
> *Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler
> has been disabled.*
> *Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache
> size using -XX:ReservedCodeCacheSize=*
> *CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb*
> *bounds [0x7fa4fd00, 0x7fa50c00, 0x7fa50c00]*
> *total_blobs=54308 nmethods=53551 adapters=617*
> *compilation: disabled (not enough contiguous free space left)*
>
>
>
> --
> *From:* Yan Zhou [FDS Science]  
> *Sent:* Tuesday, May 29, 2018 10:52:18 PM
> *To:* user@flink.apache.org
> *Subject:* NPE in flink sql over-window
>
>
> Hi,
>
> I am using flink sql 1.5.0. My application throws NPE. And after it
> recover from checkpoint automatically, it throws NPE immediately from same
> line of code.
>
>
> My application 

Re: Writing stream to Hadoop

2018-06-05 Thread Marvin777
 I think you can look at this comment, thanks.

* Part files can be in one of three states: {@code in-progress},
{@code pending} or {@code finished}.
* The reason for this is how the sink works together with the
checkpointing mechanism to provide exactly-once
* semantics and fault-tolerance. The part file that is currently being
written to is {@code in-progress}. Once
* a part file is closed for writing it becomes {@code pending}. When a
checkpoint is successful the currently
* pending files will be moved to {@code finished}.






2018-06-05 17:14 GMT+08:00 miki haiat :

> Im trying to write some data to Hadoop by using this code
>
> The state backend is set without time
>
> StateBackend sb = new 
> FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints");
> env.setStateBackend(sb);
>
> BucketingSink> sink =
> new BucketingSink<>("hdfs://:9000/mycity/raw");
> sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
> sink.setInactiveBucketCheckInterval(12);
> sink.setInactiveBucketThreshold(12);
>
> the result is that all the files are stuck in* in.programs  *status and
> not closed.
> is it related to the state backend configuration.
>
> thanks,
>
> Miki
>
>


Writing stream to Hadoop

2018-06-05 Thread miki haiat
Im trying to write some data to Hadoop by using this code

The state backend is set without time

StateBackend sb = new
FsStateBackend("hdfs://***:9000/flink/my_city/checkpoints");
env.setStateBackend(sb);

BucketingSink> sink =
new BucketingSink<>("hdfs://:9000/mycity/raw");
sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
sink.setInactiveBucketCheckInterval(12);
sink.setInactiveBucketThreshold(12);

the result is that all the files are stuck in* in.programs  *status and not
closed.
is it related to the state backend configuration.

thanks,

Miki


Re: Cannot determine simple type name - [FLINK-7490]

2018-06-05 Thread Timo Walther

This sounds similar to https://issues.apache.org/jira/browse/FLINK-9220.

Can you explain the steps that I have to do to reproduce the error?

Regards,
Timo

Am 05.06.18 um 08:06 schrieb Chesnay Schepler:
Please re-open the issue. It would be great if you could also provide 
us with a reproducing example.


On 04.06.2018 22:47, rakeshchalasani wrote:
I am getting an error when using the Table API with ScalarFunction. 
This is
previously reported here 
(https://issues.apache.org/jira/browse/FLINK-7490)

as loading a wrong class loader and is reported as fixed. But the issue
persists even after using the latest code from 1.3 branch.

>From some of my experiments, this might be a potential place where 
the issue

is, but not sure.
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala#L746 



Our application is such that the ScalarFunction is defined and 
instantiated
in a different package from where the topology is executed. Any 
direction on

how to work around this or what I am doing wrong is much appreciated!

Thanks,
Rakesh

org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be

compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)

at
org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35) 


at
org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49) 


at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) 


at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) 


at
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) 


at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) 


at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) 


at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.codehaus.commons.compiler.CompileException: Line 12, 
Column

21: Cannot determine simple type name "ax"
at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821)

at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6494) 


at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6255) 


at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6268) 


at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6268) 


at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6268) 


at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6268) 


at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6268) 


at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6268) 


at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6234)
at 
org.codehaus.janino.UnitCompiler.access$13400(UnitCompiler.java:212)

at
org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6142) 


at
org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6137) 


at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3757)
at 
org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6137)
at 
org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6130)

at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3756)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6130)
at 
org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7926)

at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6534)
at 
org.codehaus.janino.UnitCompiler.access$13900(UnitCompiler.java:212)

at
org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6160) 


at
org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6155) 


at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4139)
at
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6155) 


at
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6151) 


at org.codehaus.janino.Java$Lvalue.accept(Java.java:3977)
at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6151)
at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6130)

at org.codehaus.janino.Java$Rvalue.accept(Java.java:3945)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6130)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6516)
at 
org.codehaus.janino.UnitCompiler.access$13700(UnitCompiler.java:212)

at

Re: Checkpointing when reading from files?

2018-06-05 Thread Fabian Hueske
Hi,

The continuous file source is split into two components. 1) A split
generator that monitors a directory and generates splits when a new file is
observed, and 2) reading tasks that receive splits and read the referenced
files.

I think this is the code that generates input splits which are distributed
to reading tasks.
In PROCESS_ONCE mode, all files are listed once, splits are generated and
forwarded to the reading tasks.
At that point, the split generator can stop because it did all the work.
The reading tasks received all splits and maintain them in state.
When the job needs to recover, the split generator won't re-emit splits
because they are already checkpointed by the reader tasks.

@Padarn: There is no way to force a checkpoint from within an application.
Checkpoints are triggered by the JobManager.

Best, Fabian

2018-05-27 14:52 GMT+02:00 Padarn Wilson :

> I'm a bit confused about this too actually. I think the above would work
> as a solution if you want to continuously monitor a directory, but for a
> "PROCESS_ONCE" readFile source I don't think you will get a checkpoint
> emitted indicating the end of the stream.
>
> My understanding of this is that there can be no checkpoints created while 
> the file directory
>
> Trying to dig into the java code I found this:
>
> case PROCESS_ONCE:
>synchronized (checkpointLock) {
>
>   // the following check guarantees that if we restart
>   // after a failure and we managed to have a successful
>   // checkpoint, we will not reprocess the directory.
>
>   if (globalModificationTime == Long.MIN_VALUE) {
>  monitorDirAndForwardSplits(fileSystem, context);
>  globalModificationTime = Long.MAX_VALUE;
>   }
>   isRunning = false;
>}
>break;
>
> My understanding of this is that there can be no checkpoints created while 
> the file directory is read, and then once it is read the isRunning flat is 
> set to false, which means no new checkpoints are emitted.
>
> Is this correct? If so, is it possible to somehow force a checkpoint to be 
> emitted on the completion of the source?
>
>
>
> On Tue, May 22, 2018 at 3:24 AM Amit Jain  wrote:
>
>> Hi Alex,
>>
>> StreamingExecutionEnvironment#readFile is a helper function to create
>> file reader data streaming source. It uses
>> ContinuousFileReaderOperator and ContinuousFileMonitoringFunction
>> internally.
>>
>> As both file reader operator and monitoring function uses
>> checkpointing so is readFile [1], you can go with first approach.
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-
>> release-1.4/api/java/org/apache/flink/streaming/api/environment/
>> StreamExecutionEnvironment.html#readFile-org.apache.flink.api.common.io.
>> FileInputFormat-java.lang.String-org.apache.flink.
>> streaming.api.functions.source.FileProcessingMode-
>> long-org.apache.flink.api.common.typeinfo.TypeInformation-
>>
>>
>> --
>> Thanks,
>> Amit
>>
>>
>> On Mon, May 21, 2018 at 9:39 PM, NEKRASSOV, ALEXEI 
>> wrote:
>> > I want to add checkpointing to my program that reads from a set of
>> files in
>> > a directory. Without checkpointing I use readFile():
>> >
>> >
>> >
>> >   DataStream text = env.readFile(
>> >
>> >new TextInputFormat(new Path(inputPath)),
>> >
>> >inputPath,
>> >
>> >   inputProcessingMode,
>> >
>> >   1000);
>> >
>> >
>> >
>> > Should I use ContinuousFileMonitoringFunction /
>> ContinuousFileReaderOperator
>> > to add checkpointing? Or is there an easier way?
>> >
>> >
>> >
>> > How do I go from splits (that ContinuousFileMonitoringFunction
>> provides) to
>> > actual strings? I’m not clear how ContinuousFileReaderOperator can be
>> used.
>> >
>> >
>> >
>> >   DataStreamSource split =
>> > env.addSource(
>> >
>> >new ContinuousFileMonitoringFuncti
>> on(
>> >
>> >  new TextInputFormat(new
>> > Path(inputPath)),
>> >
>> >  inputProcessingMode,
>> >
>> >  1,
>> >
>> >  1000)
>> >
>> >   );
>> >
>> >
>> >
>> > Thanks,
>> > Alex
>>
>


Re: Output batch to Kafka

2018-06-05 Thread Stephan Ewen
 You could go with Chesnay's suggestion, which might be the quickest fix.

Creating a KafkaOutputFormat (possibly wrapping the KafkaProducer) would be
a bit cleaner. Would be happy to have that as a contribution, actually ;-)

If you care about producing "exactly once" using Kafka Transactions (Kafka
0.11+), it may be a tad bit more involved - please let me know if you want
details there.


On Tue, Jun 5, 2018 at 8:10 AM, Chesnay Schepler  wrote:

> This depends a little bit on your requirements.
> If it just about reading data from HDFS and writing it into Kafka, then it
> should be possible to simply wrap a KafkaProducer in a RichMapFunction that
> you use as a sink in your DataSet program.
>
> However you could also use the Streaming API for that.
>
>
> On 05.06.2018 00:39, Oleksandr Nitavskyi wrote:
>
> Hello Squirrels,
>
>
>
> Flink has a wonderful Kafka connector. We need to move data from HDFS to
> Kafka. Confluent is proposing to use Kafka-connect for this, but probably
> it can be easier to use Flink for such task, much higher abstraction, less
> details to manage, easier for our context.
>
>
>
> Do you know is there a way to output data into the Kafka using the Batch
> approach?
>
>
>
> Thanks
>
> Kind Regards
>
> Oleksandr Nitavskyi
>
>
>
>
>


Re: Ask for SQL using kafka in Flink

2018-06-05 Thread Timo Walther
@Shuyi: Yes, a more advanced table example would be helpful anyway and 
combining it with Kafka/Avro end-to-end would be even better.


@Will: I totally agree that the current connector ecosystem could be 
improved. This is also on mid-term roadmap. Contributors that could help 
here are very welcome. We also did a step towards improving the 
situation by [1][2] etc.


[1] https://issues.apache.org/jira/browse/FLINK-8240
[2] https://issues.apache.org/jira/browse/FLINK-8630

Regards,
Timo


Am 04.06.18 um 23:06 schrieb Will Du:
Yes, I am also looking for examples for Kafka avro table examples in 
java and command line. Also, Kafka avro table sink is still missing. 
In addition, once we have Kafka topic, the API should read the schema 
directly from schema file or schema registry. The way of current API 
supporting lacks of flexibility, just my own opinion.


Sent from my iPhone

On Jun 4, 2018, at 14:29, Shuyi Chen > wrote:


Given the popularity of Flink SQL and Kafka as streaming source, I 
think we can add some examples of using Kafka[XXX]TableSource in 
flink-examples/flink-examples-table module. What do you guys think?


Cheers
Shuyi

On Mon, Jun 4, 2018 at 12:57 AM, Timo Walther > wrote:


Hi,

as you can see in code [1] Kafka09JsonTableSource takes a
TableSchema. You can create table schema from type information
see [2].

Regards,
Timo

[1]

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java


[2]

https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala



Am 02.06.18 um 18:31 schrieb Radhya Sahal:

Thanks Rong,

I used Flink 1.3.0 in case of using Flink 1.5 how can I
define jsonschema?

Yes, there are two names but now I put one name only and I
want to define
jsonschema.



Rong Rong wrote

Hi Radhya,

Can you provide which Flink version you are using? Based
on the latest
FLINK 1.5 release, Kafka09JsonTableSource takes:

/**
  * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
  *
  * @param topic       Kafka topic to consume.
  * @param properties  Properties for the Kafka consumer.
  * @param tableSchema The schema of the table.
  * @param jsonSchema  The schema of the JSON messages to
decode from
Kafka.
  */

Also, your type definition: TypeInformation

  typeInfo2 = Types.ROW(...
arguments seem to have different length for schema names
and types.

Thanks,
Rong

On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal 
radhya.sahal@
 wrote:

Hi,

Could anyone help me to solve this problem


/Exception in thread "main" java.lang.Error:
Unresolved compilation
problem:
         The constructor
Kafka09JsonTableSource(String, Properties,
TypeInformation


) is undefined

/
*--This is the code *
public class FlinkKafkaSQL {
         public static void main(String[] args)
throws Exception {
             // Read parameters from command line
             final ParameterTool params =
ParameterTool.fromArgs(args);

             if(params.getNumberOfParameters() < 5) {
                 System.out.println("\nUsage:
FlinkReadKafka " +
"--read-topic


  " +

"--write-topic


  " +

"--bootstrap.servers


  " +

"zookeeper.connect" +
                                    "--group.id



");

                 return;
             }

             // setup streaming environment
             StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();


env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,

Re: flink-connector-cassandra for C* version > 3.0

2018-06-05 Thread Chesnay Schepler
cassandra.version is only used for testing. driver.version is the 
interesting property for your use-case and refers to the DataStax 
driver, which at the moment is set to 3.0.0.


You can build the connector with the version you desire by running mvn 
package -Ddriver.version=3.X.Y


On 05.06.2018 07:57, Meghashyam Sandeep V wrote:

Hi Guys,

Has anyone used flink-connector-cassadra with cassandra version >= 
3.X? I see that the cassandra version support is only 2.2.5 in the 
latest branch. 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/pom.xml#L40

is there a way to pass these properties to fetch cassandra >=3.x version?

Thanks,
Sandeep





Re: JVM metrics disappearing after job crash, restart

2018-06-05 Thread Chesnay Schepler
The config looks OK to me. On the Flink side I cannot find an 
explanation why only /some /metrics disappear.


The only explanation I could come up with at the moment is that 
FLINK-8946 is triggered, all metrics are (officially) unregistered, but 
the reporter isn't removing some metrics (i.e. all job related ones).
Due to FLINK-8946 no new metrics would be registered after the JM 
restart, but the old metrics continue to be reported.


To verify this I would add logging statements to the 
/notifyOfAddedMetric/notifyOfRemovedMetric/ methods, to check whether 
Flink attempts to unregister all metrics or only some.


On 05.06.2018 02:02, Nikolas Davis wrote:

Fabian,

It does look like it may be related. I'll add a comment. After digging 
a bit more I found that the crash and lack of metrics were 
precipitated by the JobManager instance crashing and cycling, which 
caused the job to restart.



Chesnay,

I didn't see anything interesting in our logs. Our reporter config is 
fairly straightforward (I think):


metrics.reporter.nr.class: com.newrelic.flink.NewRelicReporter
metrics.reporter.nr.interval: 60 SECONDS
metrics.reporters: nr

Nik Davis
Software Engineer
New Relic

On Mon, Jun 4, 2018 at 1:56 AM, Chesnay Schepler > wrote:


Can you show us the metrics-related configuration parameters in
flink-conf.yaml?

Please also check the logs for any warnings from the MetricGroup
and MetricRegistry classes.


On 04.06.2018 10:44, Fabian Hueske wrote:

Hi Nik,

Can you have a look at this JIRA ticket [1] and check if it is
related to the problems your are facing?
If so, would you mind leaving a comment there?

Thank you,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8946


2018-05-31 4:41 GMT+02:00 Nikolas Davis mailto:nda...@newrelic.com>>:

We keep track of metrics by using the value of
MetricGroup::getMetricIdentifier, which returns the fully
qualified metric name. The query that we use to monitor
metrics filters for metrics IDs that
match '%Status.JVM.Memory%'. As long as the new metrics come
online via the MetricReporter interface then I think the
chart would be continuous; we would just see the old JVM
memory metrics cycle into new metrics.

Nik Davis
Software Engineer
New Relic

On Wed, May 30, 2018 at 5:30 PM, Ajay Tripathy
mailto:aj...@yelp.com>> wrote:

How are your metrics dimensionalized/named? Task managers
often have UIDs generated for them. The task id dimension
will change on restart. If you name your metric based on
this 'task_id' there would be a discontinuity with the
old metric.

On Wed, May 30, 2018 at 4:49 PM, Nikolas Davis
mailto:nda...@newrelic.com>> wrote:

Howdy,

We are seeing our task manager JVM metrics disappear
over time. This last time we correlated it to our job
crashing and restarting. I wasn't able to grab the
failing exception to share. Any thoughts?

We track metrics through the MetricReporter
interface. As far as I can tell this more or less
only affects the JVM metrics. I.e. most / all other
metrics continue reporting fine as the job is
automatically restarted.

Nik Davis
Software Engineer
New Relic











Re: Output batch to Kafka

2018-06-05 Thread Chesnay Schepler

This depends a little bit on your requirements.
If it just about reading data from HDFS and writing it into Kafka, then 
it should be possible to simply wrap a KafkaProducer in a 
RichMapFunction that you use as a sink in your DataSet program.


However you could also use the Streaming API for that.

On 05.06.2018 00:39, Oleksandr Nitavskyi wrote:


Hello Squirrels,

Flink has a wonderful Kafka connector. We need to move data from HDFS 
to Kafka. Confluent is proposing to use Kafka-connect for this, but 
probably it can be easier to use Flink for such task, much higher 
abstraction, less details to manage, easier for our context.


Do you know is there a way to output data into the Kafka using the 
Batch approach?


Thanks

Kind Regards

Oleksandr Nitavskyi







Re: Cannot determine simple type name - [FLINK-7490]

2018-06-05 Thread Chesnay Schepler
Please re-open the issue. It would be great if you could also provide us 
with a reproducing example.


On 04.06.2018 22:47, rakeshchalasani wrote:

I am getting an error when using the Table API with ScalarFunction. This is
previously reported here (https://issues.apache.org/jira/browse/FLINK-7490)
as loading a wrong class loader and is reported as fixed. But the issue
persists even after using the latest code from 1.3 branch.

>From some of my experiments, this might be a potential place where the issue
is, but not sure.
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala#L746

Our application is such that the ScalarFunction is defined and instantiated
in a different package from where the topology is executed. Any direction on
how to work around this or what I am doing wrong is much appreciated!

Thanks,
Rakesh

org.apache.flink.api.common.InvalidProgramException: Table program cannot be
compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at
org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
at
org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
at
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.codehaus.commons.compiler.CompileException: Line 12, Column
21: Cannot determine simple type name "ax"
at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821)
at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6494)
at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6255)
at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6268)
at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6268)
at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6268)
at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6268)
at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6268)
at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6268)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6234)
at org.codehaus.janino.UnitCompiler.access$13400(UnitCompiler.java:212)
at
org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6142)
at
org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6137)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3757)
at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6137)
at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6130)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3756)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6130)
at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7926)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6534)
at org.codehaus.janino.UnitCompiler.access$13900(UnitCompiler.java:212)
at
org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6160)
at
org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6155)
at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4139)
at
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6155)
at
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6151)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:3977)
at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6151)
at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6130)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:3945)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6130)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6516)
at org.codehaus.janino.UnitCompiler.access$13700(UnitCompiler.java:212)
at
org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6158)
at

Re: [DISCUSS] Flink 1.6 features

2018-06-05 Thread Ben Yan
Hi Stephan,

Will  [ https://issues.apache.org/jira/browse/FLINK-5479 ]  (Per-partition
watermarks in FlinkKafkaConsumer should consider idle partitions) be
included in 1.6? As we are seeing more users with this issue on the mailing
lists.

Thanks.
Ben

2018-06-05 5:29 GMT+08:00 Che Lui Shum :

> Hi Stephan,
>
> Will FLINK-7129 (Support dynamically changing CEP patterns) be included in
> 1.6? There were discussions about possibly including it in 1.6:
> http://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3cCAMq=
> ou7gru2o9jtowxn1lc1f7nkcxayn6a3e58kxctb4b50...@mail.gmail.com%3e
>
> Thanks,
> Shirley Shum
>
> [image: Inactive hide details for Stephan Ewen ---06/04/2018 02:21:47
> AM---Hi Flink Community! The release of Apache Flink 1.5 has happ]Stephan
> Ewen ---06/04/2018 02:21:47 AM---Hi Flink Community! The release of Apache
> Flink 1.5 has happened (yay!) - so it is a good time
>
> From: Stephan Ewen 
> To: d...@flink.apache.org, user 
> Date: 06/04/2018 02:21 AM
> Subject: [DISCUSS] Flink 1.6 features
> --
>
>
>
> Hi Flink Community!
>
> The release of Apache Flink 1.5 has happened (yay!) - so it is a good time
> to start talking about what to do for release 1.6.
>
> *== Suggested release timeline ==*
>
> I would propose to release around *end of July* (that is 8-9 weeks from
> now).
>
> The rational behind that: There was a lot of effort in release testing
> automation (end-to-end tests, scripted stress tests) as part of release
> 1.5. You may have noticed the big set of new modules under
> "flink-end-to-end-tests" in the Flink repository. It delayed the 1.5
> release a bit, and needs to continue as part of the coming release cycle,
> but should help make releasing more lightweight from now on.
>
> (Side note: There are also some nightly stress tests that we created and
> run at data Artisans, and where we are looking whether and in which way it
> would make sense to contribute them to Flink.)
>
> *== Features and focus areas ==*
>
> We had a lot of big and heavy features in Flink 1.5, with FLIP-6, the new
> network stack, recovery, SQL joins and client, ... Following something like
> a "tick-tock-model", I would suggest to focus the next release more on
> integrations, tooling, and reducing user friction.
>
> Of course, this does not mean that no other pull request gets reviewed, an
> no other topic will be examined - it is simply meant as a help to
> understand where to expect more activity during the next release cycle.
> Note that these are really the coarse focus areas - don't read this as a
> comprehensive list.
>
> This list is my first suggestion, based on discussions with committers,
> users, and mailing list questions.
>
>   - Support Java 9 and Scala 2.12
>
>   - Smoothen the integration in Container environment, like "Flink as a
> Library", and easier integration with Kubernetes services and other proxies.
>
>   - Polish the remaing parts of the FLIP-6 rewrite
>
>   - Improve state backends with asynchronous timer snapshots, efficient
> timer deletes, state TTL, and broadcast state support in RocksDB.
>
>   - Extends Streaming Sinks:
>  - Bucketing Sink should support S3 properly (compensate for eventual
> consistency), work with Flink's shaded S3 file systems, and efficiently
> support formats that compress/index arcoss individual rows (Parquet, ORC,
> ...)
>  - Support ElasticSearch's new REST API
>
>   - Smoothen State Evolution to support type conversion on snapshot restore
>
>   - Enhance Stream SQL and CEP
>  - Add support for "update by key" Table Sources
>  - Add more table sources and sinks (Kafka, Kinesis, Files, K/V stores)
>  - Expand SQL client
>  - Integrate CEP and SQL, through MATCH_RECOGNIZE clause
>  - Improve CEP Performance of SharedBuffer on RocksDB
>
>
>
>