Help with a stream processing use case

2019-02-08 Thread Sandybayev, Turar (CAI - Atlanta)
Hi all,

I wonder whether it’s possible to use Flink for the following requirement. We 
need to process a Kinesis stream and based on values in each record, route 
those records to different S3 buckets and keyspaces, with support for batching 
up of files and control over partitioning scheme (so preferably through 
Firehose).

I know it’s straightforward to have a Kinesis source and a Kinesis sink, and 
the hook up Firehose to the sink from AWS, but I need a “fan out” to 
potentially thousands of different buckets, based on content of each event.

Thanks!
Turar




Re: Running with Docker image on ECS/Fargate?

2018-06-19 Thread Sandybayev, Turar (CAI - Atlanta)
Sorry, sent to a wrong group originally. My question is below:

Date: Tuesday, June 19, 2018 at 11:01 AM
To: "d...@flink.apache.org" 
Subject: Running with Docker image on ECS/Fargate?

Hi,

Has anyone tried to run a Flink cluster on AWS ECS? I couldn’t figure how to 
replicate “docker-compose scale taskmanager=4” with ecs-cli, and looking at: 
https://github.com/aws/amazon-ecs-cli/issues/166 , seems like it’s impossible 
at the moment?

If above is true, is it generally recommended to use Kubernetes/Helm even on 
AWS instead?

Thanks,
Turar


Re: Problem producing to Kinesis

2018-06-15 Thread Sandybayev, Turar (CAI - Atlanta)
I’m seeing a different exception when producing to Kinesis, which seems to do 
with back pressure handling:


java.lang.RuntimeException: An exception was thrown while processing a record: 
Rate exceeded for shard shardId-0026 in stream turar-test-output under 
account .

Rate exceeded for shard shardId-0026 in stream turar-test-output under 
account .

Rate exceeded for shard shardId-0026 in stream turar-test-output under 
account .

Rate exceeded for shard shardId-0026 in stream turar-test-output under 
account .

Record has reached expiration


When this exception occurs the entire job restarts after cancelling all the 
workers. We have restart configuration set to 3, so after 3 restarts the entire 
application dies. We’re also running 1.4.x on EMR, and rebuilding with 
“-Daws.kinesis-kpl-version=0.12.6” as suggested below didn’t seem to help.

Is there a recommended solution to handle these kinds of exceptions without the 
entire application getting killed and without loss of data?

Thanks,
Turar


From: "Tzu-Li (Gordon) Tai" 
Date: Friday, June 15, 2018 at 4:48 AM
To: "dyana.rose" , "user@flink.apache.org" 

Subject: Re: Problem producing to Kinesis

@Alexey

If you’d like to stick to 1.4.x for now, you can just do:
`mvn clean install -Daws.kinesis-kpl-version=0.12.6` when building the Kinesis 
connector, to upgrade the KPL version used.

I think we should add this to the documentation. Here’s a JIRA to track that - 
https://issues.apache.org/jira/browse/FLINK-9595.

Cheers,
Gordon

On 15 June 2018 at 9:48:33 AM, dyana.rose 
(dyana.r...@salecycle.com) wrote:
Porting and rebuilding 1.4.x isn't a big issue. I've done it on our fork, back 
when I reported the upcoming issue and we're running fine.

https://github.com/SaleCycle/flink/commit/d943a172ae7e6618309b45df848d3b9432e062d4

Ignore the circleci file of course, and the rest are the changes that I back 
ported from 1.5

Dyana

On 2018/06/14 20:44:10, Alexey Tsitkin  wrote:
> Thanks Gordon!
>
> This indeed seems like the cause of the issue.
> I've ran the program using 1.5.0, after building the appropriate connector,
> and it's working as expected.
>
> Wondering how difficult is it to upgrade the 1.4 connector to a newer KPL
> version, as this kind of blocks running on EMR and producing to Kinesis...
> :-)
>
> Alexey
>
> On Thu, Jun 14, 2018 at 12:20 PM Tzu-Li (Gordon) Tai 
> wrote:
>
> > Hi,
> >
> > This could be related:
> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-1-4-and-below-STOPS-writing-to-Kinesis-after-June-12th-td22687.html#a22701
> > .
> >
> > Shortly put, the KPL library version used by default in the 1.4.x Kinesis
> > connector, is no longer supported by AWS.
> > Users would need to use a upgraded version >= 0.12.6 and build the Kinesis
> > connector for the producer to work.
> >
> > We should probably add a warning about this in the Kinesis connector docs.
> >
> > Cheers,
> > Gordon
> >
> > On 14 June 2018 at 9:16:04 PM, Lasse Nedergaard (lassenederga...@gmail.com)
> > wrote:
> >
> > Hi.
> >
> > We see the same error and to my understanding it’s a known error from
> > Amazon. See.
> > https://github.com/awslabs/amazon-kinesis-producer/issues/39#issuecomment-396219522
> >
> > We don’t have a workaround and haven’t found the reason for the exception.
> > It is one off the reason why we move to Kafka in the near future.
> >
> > Med venlig hilsen / Best regards
> > Lasse Nedergaard
> >
> >
> > Den 14. jun. 2018 kl. 20.24 skrev Alexey Tsitkin  > >:
> >
> > Hi,
> > I'm trying to run a simple program which consumes from one kinesis stream,
> > does a simple transformation, and produces to another stream.
> > Running on Flink 1.4.0.
> >
> > Code can be seen here (if needed I can also paste it directly on this
> > thread):
> >
> > https://stackoverflow.com/questions/50847164/flink-producing-to-kinesis-not-working
> >
> > Consuming the source stream works great, but trying to use the producer
> > causes the exception:
> >
> > *org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.DaemonException:
> > The child process has been shutdown and can no longer accept messages.*
> > * at
> > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:176)*
> > * at
> > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:477)*
> > * at
> > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.invoke(FlinkKinesisProducer.java:248)*
> > * ...*
> >
> >
> > Did anyone have something similar?
> > Or is there any way to debug the daemon itself, to understand the source
> > of the error?
> >
> > As you can see, this is a trivial example, which I mostly copy-pasted from
> > the documentation.
> >
> > Thanks,
> > Alexey
> >
> >
>


Re: Implementing a “join” between a DataStream and a “set of rules”

2018-06-05 Thread Sandybayev, Turar (CAI - Atlanta)
Hi Amit,

In my current approach the idea for updating rule set data was to have some 
kind of a "control" stream that will trigger an update to a local data 
structure, or a "control" event within the main data stream that will trigger 
the same.

Using external system like a cache or database is also an option, but that 
still will require some kind of a trigger to reload rule set or a single rule, 
in case of any updates to it.

Others have suggested using Flink managed state, but I'm still not sure whether 
that is a generally recommended approach in this scenario, as it seems like it 
was more meant for windowing-type processing instead?

Thanks,
Turar

On 6/5/18, 8:46 AM, "Amit Jain"  wrote:

Hi Sandybayev,

In the current state, Flink does not provide a solution to the
mentioned use case. However, there is open FLIP[1] [2] which has been
created to address the same.

I can see in your current approach, you are not able to update the
rule set data. I think you can update rule set data by building
DataStream around changelogs which are stored in message
queue/distributed file system.
OR
You can store rule set data in the external system where you can query
for incoming keys from Flink.

[1]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
[2]: https://issues.apache.org/jira/browse/FLINK-6131

On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta)
 wrote:
> Hi,
>
>
>
> What is the best practice recommendation for the following use case? We 
need
> to match a stream against a set of “rules”, which are essentially a Flink
> DataSet concept. Updates to this “rules set" are possible but not 
frequent.
> Each stream event must be checked against all the records in “rules set”,
> and each match produces one or more events into a sink. Number of records 
in
> a rule set are in the 6 digit range.
>
>
>
> Currently we're simply loading rules into a local List of rules and using
> flatMap over an incoming DataStream. Inside flatMap, we're just iterating
> over a list comparing each event to each rule.
>
>
>
> To speed up the iteration, we can also split the list into several 
batches,
> essentially creating a list of lists, and creating a separate thread to
> iterate over each sub-list (using Futures in either Java or Scala).
>
>
>
> Questions:
>
> 1.Is there a better way to do this kind of a join?
>
> 2.If not, is it safe to add additional parallelism by creating
> new threads inside each flatMap operation, on top of what Flink is already
> doing?
>
>
>
> Thanks in advance!
>
> Turar
>
>




Re: Implementing a “join” between a DataStream and a “set of rules”

2018-06-05 Thread Sandybayev, Turar (CAI - Atlanta)
Hi Aljoscha,

Thank you, this seems like a match for this use case. Am I understanding 
correctly that since only MemoryStateBackend is available for broadcast state, 
the max amount possible is 5MB?

If I use Flink state mechanism for storing rules, I will still need to iterate 
through all rules inside of a flatMap, and there’s no higher-level join 
mechanism that I can employ, right? Is there any downside in trying to 
parallelize that iteration inside my user flatMap operation?

Thanks
Turar

From: Aljoscha Krettek 
Date: Tuesday, June 5, 2018 at 12:05 PM
To: Amit Jain 
Cc: "Sandybayev, Turar (CAI - Atlanta)" , 
"user@flink.apache.org" 
Subject: Re: Implementing a “join” between a DataStream and a “set of rules”

Hi,

you might be interested in this newly-introduced feature: 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/broadcast_state.html

Best,
Aljoscha


On 5. Jun 2018, at 14:46, Amit Jain 
mailto:aj201...@gmail.com>> wrote:

Hi Sandybayev,

In the current state, Flink does not provide a solution to the
mentioned use case. However, there is open FLIP[1] [2] which has been
created to address the same.

I can see in your current approach, you are not able to update the
rule set data. I think you can update rule set data by building
DataStream around changelogs which are stored in message
queue/distributed file system.
OR
You can store rule set data in the external system where you can query
for incoming keys from Flink.

[1]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
[2]: https://issues.apache.org/jira/browse/FLINK-6131

On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta)
mailto:turar.sandyba...@coxautoinc.com>> wrote:

Hi,



What is the best practice recommendation for the following use case? We need
to match a stream against a set of “rules”, which are essentially a Flink
DataSet concept. Updates to this “rules set" are possible but not frequent.
Each stream event must be checked against all the records in “rules set”,
and each match produces one or more events into a sink. Number of records in
a rule set are in the 6 digit range.



Currently we're simply loading rules into a local List of rules and using
flatMap over an incoming DataStream. Inside flatMap, we're just iterating
over a list comparing each event to each rule.



To speed up the iteration, we can also split the list into several batches,
essentially creating a list of lists, and creating a separate thread to
iterate over each sub-list (using Futures in either Java or Scala).



Questions:

1.Is there a better way to do this kind of a join?

2.If not, is it safe to add additional parallelism by creating
new threads inside each flatMap operation, on top of what Flink is already
doing?



Thanks in advance!

Turar




Re: Implementing a “join” between a DataStream and a “set of rules”

2018-06-05 Thread Sandybayev, Turar (CAI - Atlanta)
Thanks Garvit for your suggestion!

From: Garvit Sharma 
Date: Tuesday, June 5, 2018 at 8:44 AM
To: "Sandybayev, Turar (CAI - Atlanta)" 
Cc: "user@flink.apache.org" 
Subject: Re: Implementing a “join” between a DataStream and a “set of rules”

Hi,

For the above use case, you should do the following :

1. Convert your DataStream into KeyedDataStream by defining a key which would 
be used to get validated against your rules.
2. Same as 1 for rules stream.
3. Join the two keyedStreams using Flink's connect operator.
4. Store the rules into Flink's internal state i,e. Flink's managed keyed state.
5. Validate the data coming in the dataStream against the managed keyed state.

Refer to [1] [2] for more details.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/state.html



On Tue, Jun 5, 2018 at 5:52 PM, Sandybayev, Turar (CAI - Atlanta) 
mailto:turar.sandyba...@coxautoinc.com>> wrote:
Hi,

What is the best practice recommendation for the following use case? We need to 
match a stream against a set of “rules”, which are essentially a Flink DataSet 
concept. Updates to this “rules set" are possible but not frequent. Each stream 
event must be checked against all the records in “rules set”, and each match 
produces one or more events into a sink. Number of records in a rule set are in 
the 6 digit range.

Currently we're simply loading rules into a local List of rules and using 
flatMap over an incoming DataStream. Inside flatMap, we're just iterating over 
a list comparing each event to each rule.

To speed up the iteration, we can also split the list into several batches, 
essentially creating a list of lists, and creating a separate thread to iterate 
over each sub-list (using Futures in either Java or Scala).

Questions:
1.Is there a better way to do this kind of a join?
2.If not, is it safe to add additional parallelism by creating new 
threads inside each flatMap operation, on top of what Flink is already doing?

Thanks in advance!
Turar




--

Garvit Sharma
github.com/garvitlnmiit/<http://github.com/garvitlnmiit/>

No Body is a Scholar by birth, its only hard work and strong determination that 
makes him master.


Implementing a “join” between a DataStream and a “set of rules”

2018-06-05 Thread Sandybayev, Turar (CAI - Atlanta)
Hi,

What is the best practice recommendation for the following use case? We need to 
match a stream against a set of “rules”, which are essentially a Flink DataSet 
concept. Updates to this “rules set" are possible but not frequent. Each stream 
event must be checked against all the records in “rules set”, and each match 
produces one or more events into a sink. Number of records in a rule set are in 
the 6 digit range.

Currently we're simply loading rules into a local List of rules and using 
flatMap over an incoming DataStream. Inside flatMap, we're just iterating over 
a list comparing each event to each rule.

To speed up the iteration, we can also split the list into several batches, 
essentially creating a list of lists, and creating a separate thread to iterate 
over each sub-list (using Futures in either Java or Scala).

Questions:
1.Is there a better way to do this kind of a join?
2.If not, is it safe to add additional parallelism by creating new 
threads inside each flatMap operation, on top of what Flink is already doing?

Thanks in advance!
Turar



Re: Looking for a working POM file example for EMR cluster

2018-06-01 Thread Sandybayev, Turar (CAI - Atlanta)
Thanks Georgi,

I ended up chucking the POM example from AWS ref architecture and re-generating 
a new pom from Flink 1.4 archetype, and then logging into a master node via SSH 
and submitting a job directly. Using Steps on EMR Console doesn’t seem to be 
quite the same thing I guess.

Thanks for your help!!

Turar

From: Georgi Stoyanov 
Date: Friday, June 1, 2018 at 2:18 AM
To: "Sandybayev, Turar (CAI - Atlanta)" , 
"user@flink.apache.org" 
Subject: RE: Looking for a working POM file example for EMR cluster




Hi,



Did you check solutions from here - 
https://stackoverflow.com/questions/48904881/could-not-resolve-substitution-to-a-value-akka-stream-materializer-in-aws-la



Regards,

Georgi Stoyanov



________
From: Sandybayev, Turar (CAI - Atlanta) 
Sent: Thursday, May 31, 2018 11:23:22 PM
To: user@flink.apache.org
Subject: Looking for a working POM file example for EMR cluster

Hi,

I'm looking for a sample POM file that works when running on EMR cluster. I'm 
new to Flink and EMR, so I'm simply following AWS EMR documentation on Flink 
and I am creating a Step and submitting my program JAR file. My program is just 
a slight modification of the Wikipedia example.

I was trying to follow an example from AWS reference architecture for their 
Taxi events example: 
https://github.com/aws-samples/flink-stream-processing-refarch/blob/master/flink-taxi-stream-processor/pom.xml

However, I've been seeing various errors having to do with dependencies and 
ClassNotFoundExceptions for basic common Flink dependencies. I tried removing 
excludes from the maven-shade-plugin section of the POM file from the reference 
architecture, and now I'm seeing the following exception:

Exception in thread "main" 
com.typesafe.config.ConfigException$UnresolvedSubstitution: Could not resolve 
substitution to a value: ${akka.stream.materializer}

If I run a local Flink cluster and submit my JAR, I'm not seeing any issues 
with pretty much any way I modify the POM file. I would greatly appreciate if 
someone can point me to a working POM example.

Thanks!
Turar




Looking for a working POM file example for EMR cluster

2018-05-31 Thread Sandybayev, Turar (CAI - Atlanta)
Hi,

I'm looking for a sample POM file that works when running on EMR cluster. I'm 
new to Flink and EMR, so I'm simply following AWS EMR documentation on Flink 
and I am creating a Step and submitting my program JAR file. My program is just 
a slight modification of the Wikipedia example. 

I was trying to follow an example from AWS reference architecture for their 
Taxi events example: 
https://github.com/aws-samples/flink-stream-processing-refarch/blob/master/flink-taxi-stream-processor/pom.xml
 

However, I've been seeing various errors having to do with dependencies and 
ClassNotFoundExceptions for basic common Flink dependencies. I tried removing 
excludes from the maven-shade-plugin section of the POM file from the reference 
architecture, and now I'm seeing the following exception:

Exception in thread "main" 
com.typesafe.config.ConfigException$UnresolvedSubstitution: Could not resolve 
substitution to a value: ${akka.stream.materializer}

If I run a local Flink cluster and submit my JAR, I'm not seeing any issues 
with pretty much any way I modify the POM file. I would greatly appreciate if 
someone can point me to a working POM example.

Thanks!
Turar