Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Averell
Hello Fabian,

I created the JIRA bug https://issues.apache.org/jira/browse/FLINK-9940
BTW, I have one more question: Is it worth to checkpoint that list of
processed files? Does the current implementation of file-source guarantee
exactly-once? 

Thanks for your support.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Averell
Thank you Fabian.

I tried to implement a quick test basing on what you suggested: having an
offset from system time, and I did get improvement: with offset = 500ms -
the problem has completely gone.  With offset = 50ms, I still got around 3-5
files missed out of 10,000. This number might come from the difference
between clocks of the EC2 instance and S3.

I Will now try to implement exactly what you suggested, and open a Jira
issue as well.

Thanks for your help.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Best way to find the current alive jobmanager with HA mode zookeeper

2018-07-24 Thread Yuan,Youjun
Hi all,

I have a standalone cluster with 3 jobmanagers, and set high-availability to 
zookeeper. Our client submits job by REST API(POST /jars/:jarid/run), which 
means we need to know the host of the any of the current alive jobmanagers. The 
problem is that, how can we know which job manager is alive, or the host of 
current leader?  We don't want to access a dead JM.

Thanks.
Youjun Yuan


Re: downgrade Flink

2018-07-24 Thread vino yang
Hi Cederic,

I just read the project you gave, it includes the following statement in
its README file.


*“flink-jpmml is tested with the latest Flink (i.e. 1.3.2), but any working
Apache Flink version (repo) should work properly.”*


This project was born a year ago and should not rely on versions prior to
Flink 1.0.

You can confirm it again.

Thanks, vino.


2018-07-25 6:44 GMT+08:00 Cederic Bosmans :

> Dear
>
> I am working on a streaming prediction model for which I want to try to
> use the flink-jpmml extension. (https://github.com/FlinkML/flink-jpmml)
> Unfortunately, it only supports only the 0.7.0-SNAPSHOT and 0.6.1
> versions of Flink and I am using the 1.7-SNAPSHOT version of Flink.
> How can I downgrade my version?
> (the examples are written for sbt and I am using Maven)
> Thank you very much!
>
> Kind regards
> Cederic
>
>


Re: Flink 1.5 batch job fails to start

2018-07-24 Thread vino yang
Hi Alex,

Is it possible that the data has been corrupted?

Or have you confirmed that the avro version is consistent in different
Flink versions?

Also, if you don't upgrade Flink and still use version 1.3.1, can it be
recovered?

Thanks, vino.


2018-07-25 8:32 GMT+08:00 Alex Vinnik :

> Vino,
>
> Upgraded flink to Hadoop 2.8.1
>
> $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
> entrypoint | grep 'Hadoop version'
> 2018-07-25T00:19:46.142+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Hadoop version: 2.8.1
>
> but job still fails to start
>
> Ideas?
>
> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
> d84cccd3bffcba1f243352a5e5ef99a9.
> at org.apache.flink.runtime.dispatcher.Dispatcher.
> submitJob(Dispatcher.java:254)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:247)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.
> handleRpcMessage(AkkaRpcActor.java:162)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(
> FencedAkkaRpcActor.java:70)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(
> AkkaRpcActor.java:142)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.
> onReceive(FencedAkkaRpcActor.java:40)
> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
> UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> ... 4 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
> init>(JobManagerRunner.java:169)
> at org.apache.flink.runtime.dispatcher.Dispatcher$
> DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
> at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(
> Dispatcher.java:287)
> at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(
> Dispatcher.java:277)
> at org.apache.flink.runtime.dispatcher.Dispatcher.
> persistAndRunJob(Dispatcher.java:262)
> at org.apache.flink.runtime.dispatcher.Dispatcher.
> submitJob(Dispatcher.java:249)
> ... 21 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
> initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.
> HadoopOutputFormat@a3123a9)': Deserializing the OutputFormat
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
> failed: unread block data
> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
> buildGraph(ExecutionGraphBuilder.java:220)
> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
> buildGraph(ExecutionGraphBuilder.java:100)
> at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(
> JobMaster.java:1150)
> at org.apache.flink.runtime.jobmaster.JobMaster.
> createAndRestoreExecutionGraph(JobMaster.java:1130)
> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:298)
> at org.apache.flink.runtime.jobmaster.JobManagerRunner.<
> init>(JobManagerRunner.java:151)
> ... 26 more
> Caused by: java.lang.Exception: Deserializing the OutputFormat
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
> failed: unread block data
> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.
> initializeOnMaster(OutputFormatVertex.java:63)
> at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.
> buildGraph(ExecutionGraphBuilder.java:216)
> ... 31 more
> Caused by: java.lang.IllegalStateException: unread block data
> at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(
> ObjectInputStream.java:2781)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2285)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2067)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:488)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:475)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> 

Re: Flink 1.5 batch job fails to start

2018-07-24 Thread Alex Vinnik
Vino,

Upgraded flink to Hadoop 2.8.1

$ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep
entrypoint | grep 'Hadoop version'
2018-07-25T00:19:46.142+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
version: 2.8.1

but job still fails to start

Ideas?

Caused by: org.apache.flink.util.FlinkException: Failed to submit job
d84cccd3bffcba1f243352a5e5ef99a9.
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:169)
at
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at
org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at
org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at
org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot
initialize task 'DataSink
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)':
Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
failed: unread block data
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:298)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat
(org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)
failed: unread block data
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more


On Tue, Jul 24, 2018 at 10:32 AM vino yang  wrote:

> Hi Alex,
>
> Based on your log information, the potential reason is Hadoop version. To
> troubleshoot the exception comes from different Hadoop version. I suggest
> 

downgrade Flink

2018-07-24 Thread Cederic Bosmans
Dear

I am working on a streaming prediction model for which I want to try to use
the flink-jpmml extension. (https://github.com/FlinkML/flink-jpmml)
Unfortunately, it only supports only the 0.7.0-SNAPSHOT and 0.6.1 versions
of Flink and I am using the 1.7-SNAPSHOT version of Flink.
How can I downgrade my version?
(the examples are written for sbt and I am using Maven)
Thank you very much!

Kind regards
Cederic


Re: Recommended fat jar excludes?

2018-07-24 Thread Chesnay Schepler
The previous list exclude a number of dependencies to prevent clashes 
with Flink (for example netty)

which is no longer required.

If you could provide the output of "mvn dependency:tree" we might be 
able to figure out why the jar is larger.


On 24.07.2018 20:49, jlist9 wrote:
We started out with a sample project from an earlier version of 
flink-java. The sample project's pom.xml contained a long list of 
 elements for building the fat jar. The fat jar size is 
slightly over 100MB in our case.


We are looking to upgrade to Flink 1.5 so we updated the pom.xml using 
one generated with the maven command on Flink 1.5 quick start page:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/quickstart/java_api_quickstart.html 



The fat jar from the same code increases to over 200MB. I notice that 
the new pom.xml file only has a very short  list (4 lines). 
I wonder if there is a recommended exclude list for Flink 1.5? I could 
use the exclude list from the earlier version and do a bit of trial 
and error to clean it up. If there is an updated list for 1.5, it'll 
probably save us a lot time. File size isn't so much of an issue for 
storage. In our process we copy the pipeline jar files so it'll take 
twice of the time before we can start the job.


Thanks!
Jack





Recommended fat jar excludes?

2018-07-24 Thread jlist9
We started out with a sample project from an earlier version of flink-java.
The sample project's pom.xml contained a long list of  elements
for building the fat jar. The fat jar size is slightly over 100MB in our
case.

We are looking to upgrade to Flink 1.5 so we updated the pom.xml using one
generated with the maven command on Flink 1.5 quick start page:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/quickstart/java_api_quickstart.html


The fat jar from the same code increases to over 200MB. I notice that the
new pom.xml file only has a very short  list (4 lines). I wonder
if there is a recommended exclude list for Flink 1.5? I could use the
exclude list from the earlier version and do a bit of trial and error to
clean it up. If there is an updated list for 1.5, it'll probably save us a
lot time. File size isn't so much of an issue for storage. In our process
we copy the pipeline jar files so it'll take twice of the time before we
can start the job.

Thanks!
Jack


Avro writer has already been opened

2018-07-24 Thread Chengzhi Zhao
Hi, there,

I am using avro format and write data to S3, recently upgraded from Flink
1.3.2 to 1.5 and noticed the following errors as below:

I am using RocksDB and checkpointDataUri is an S3 location.
My writer looks like something below.

val writer = new AvroKeyValueSinkWriter[String, R](properties).duplicate()
sink.setWriter(writer.duplicate())



2018-07-24 17:50:44,012 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
Unnamed (4/4) (28f918a31d273e176409de3d4cb46c3c) switched from RUNNING
to FAILED.
java.lang.IllegalStateException: Writer has already been opened
at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:69)
at 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:151)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:561)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
2018-07-24 17:50:44,015 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Discarding checkpoint 28 of job cc73a9db44814dc3d5a5ce497c8b0389
because: Writer has already been opened
2018-07-24 17:50:44,016 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
Enrollment Log Member and Chapter (cc73a9db44814dc3d5a5ce497c8b0389)
switched from state RUNNING to FAILING.
java.lang.IllegalStateException: Writer has already been opened
at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:69)
at 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:151)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:561)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)

Any help would be greatly appreciated. Thanks!

Regards,
Chengzhi


Re: Implement Joins with Lookup Data

2018-07-24 Thread ashish pok
App is checkpointing, so will pick up if an operation fails. I suppose you mean 
a TM completely crashes and even in that case another TM would spin up and it 
“should” pick up from checkpoint. We are running YARN but I would assume TM 
recovery would be possible in any other cluster. I havent tested this 
specifically during init phase but we have killed TMs during normal processing 
as test case in stateful processing and dont remember seeing an issue.


- Ashish

On Tuesday, July 24, 2018, 12:31 PM, Harshvardhan Agrawal 
 wrote:

What happens when one of your workers dies? Say the machine is dead is not 
recoverable. How do you recover from that situation? Will the pipeline die and 
you go over the entire bootstrap process?
On Tue, Jul 24, 2018 at 11:56 ashish pok  wrote:

BTW, 
We got around bootstrap problem for similar use case using a “nohup” topic as 
input stream. Our CICD pipeline currently passes an initialize option to app IF 
there is a need to bootstrap and waits for X minutes before taking a savepoint 
and restart app normally listening to right topic(s). I believe there is work 
underway to handle this gracefully using Side Input as well. Other than 
determining X minutes for initialization to complete, we havent had any issue 
with this solution - we have over 40 million states refreshes daily and close 
to 200Mbps input streams being joined to states.
Hope this helps!


- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy  
wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not 
allow you to pause a source (the positions), so you can't fully consume the and 
preload the accounts or products to perform the join before the positions start 
flowing.  Additionally, Flink SQL does not support materializing an upset table 
for the accounts or products to perform the join, so yo have to develop your 
own KeyedProcessFunction, maintain the state, and perform the join on your own 
if you only want to join against the latest value for each key.
On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann  wrote:

Yes, using Kafka which you initialize with the initial values and then feed 
changes to the Kafka topic from which you consume could be a solution.
On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal 
 wrote:

Hi Till,
How would we do the initial hydration of the Product and Account data since 
it’s currently in a relational DB? Do we have to copy over data to Kafka and 
then use them? 
Regards,Harsh
On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:

Hi Harshvardhan,
I agree with Ankit that this problem could actually be solved quite elegantly 
with Flink's state. If you can ingest the product/account information changes 
as a stream, you can keep the latest version of it in Flink state by using a 
co-map function [1, 2]. One input of the co-map function would be the 
product/account update stream which updates the respective entries in Flink's 
state and the other input stream is the one to be enriched. When receiving 
input from this stream one would lookup the latest information contained in the 
operator's state and join it with the incoming event.
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/[2] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
Cheers,Till
On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal 
 wrote:

Hi,
Thanks for your responses.
There is no fixed interval for the data being updated. It’s more like whenever 
you onboard a new product or there are any mandates that change will trigger 
the reference data to change.
It’s not just the enrichment we are doing here. Once we have enriched the data 
we will be performing a bunch of aggregations using the enriched data. 
Which approach would you recommend?
Regards,Harshvardhan
On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:


How often is the product db updated? Based on that you can store product 
metadata as state in Flink, maybe setup the state on cluster startup and then 
update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on 
top of Kafka. As Jorn said below, you can very well store all the events in an 
external store and then periodically run a cron to enrich later since your 
processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke 
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal 
Cc: 
Subject: Re: Implement Joins with Lookup Data

 

For the first one (lookup of single entries) you could use a NoSQL db (eg key 
value store) - a relational database will not scale.

 

Depending on when you need to do the enrichment you could also first store the 
data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal  
wrote:


Hi,

 

We are using Flink for financial data enrichment and aggregations. We have 
Positions data that we are currently 

Re: LoggingFactory: Access to static method across operators

2018-07-24 Thread Till Rohrmann
Hi Jayant,

I think you should be able to implement your own StaticLoggerBinder which
returns your own LoggerFactory. That is quite similar to how the different
logging backends (log4j, logback) integrate with slf4j.

Cheers,
Till

On Tue, Jul 24, 2018 at 5:41 PM Jayant Ameta  wrote:

> I am using a custom LoggingFactory. Is there a way to provide access to
> this custom LoggingFactory to all the operators other than adding it to all
> constructors?
>
> This is somewhat related to:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Adding-Context-To-Logs-td7351.html
>
>
> Jayant
>


Re: Implement Joins with Lookup Data

2018-07-24 Thread Harshvardhan Agrawal
What happens when one of your workers dies? Say the machine is dead is not
recoverable. How do you recover from that situation? Will the pipeline die
and you go over the entire bootstrap process?

On Tue, Jul 24, 2018 at 11:56 ashish pok  wrote:

> BTW,
>
> We got around bootstrap problem for similar use case using a “nohup” topic
> as input stream. Our CICD pipeline currently passes an initialize option to
> app IF there is a need to bootstrap and waits for X minutes before taking a
> savepoint and restart app normally listening to right topic(s). I believe
> there is work underway to handle this gracefully using Side Input as well.
> Other than determining X minutes for initialization to complete, we havent
> had any issue with this solution - we have over 40 million states refreshes
> daily and close to 200Mbps input streams being joined to states.
>
> Hope this helps!
>
>
>
> - Ashish
>
> On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <
> fearsome.lucid...@gmail.com> wrote:
>
> Alas, this suffer from the bootstrap problem.  At the moment Flink does
> not allow you to pause a source (the positions), so you can't fully consume
> the and preload the accounts or products to perform the join before the
> positions start flowing.  Additionally, Flink SQL does not support
> materializing an upset table for the accounts or products to perform the
> join, so yo have to develop your own KeyedProcessFunction, maintain the
> state, and perform the join on your own if you only want to join against
> the latest value for each key.
>
> On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann 
> wrote:
>
> Yes, using Kafka which you initialize with the initial values and then
> feed changes to the Kafka topic from which you consume could be a solution.
>
> On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi Till,
>
> How would we do the initial hydration of the Product and Account data
> since it’s currently in a relational DB? Do we have to copy over data to
> Kafka and then use them?
>
> Regards,
> Harsh
>
> On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:
>
> Hi Harshvardhan,
>
> I agree with Ankit that this problem could actually be solved quite
> elegantly with Flink's state. If you can ingest the product/account
> information changes as a stream, you can keep the latest version of it in
> Flink state by using a co-map function [1, 2]. One input of the co-map
> function would be the product/account update stream which updates the
> respective entries in Flink's state and the other input stream is the one
> to be enriched. When receiving input from this stream one would lookup the
> latest information contained in the operator's state and join it with the
> incoming event.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
>
> Cheers,
> Till
>
> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi,
>
> Thanks for your responses.
>
> There is no fixed interval for the data being updated. It’s more like
> whenever you onboard a new product or there are any mandates that change
> will trigger the reference data to change.
>
> It’s not just the enrichment we are doing here. Once we have enriched the
> data we will be performing a bunch of aggregations using the enriched data.
>
> Which approach would you recommend?
>
> Regards,
> Harshvardhan
>
> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:
>
> How often is the product db updated? Based on that you can store product
> metadata as state in Flink, maybe setup the state on cluster startup and
> then update daily etc.
>
>
>
> Also, just based on this feature, flink doesn’t seem to add a lot of value
> on top of Kafka. As Jorn said below, you can very well store all the events
> in an external store and then periodically run a cron to enrich later since
> your processing doesn’t seem to require absolute real time.
>
>
>
> Thanks
>
> Ankit
>
>
>
> *From: *Jörn Franke 
> *Date: *Monday, July 23, 2018 at 10:10 PM
> *To: *Harshvardhan Agrawal 
> *Cc: *
> *Subject: *Re: Implement Joins with Lookup Data
>
>
>
> For the first one (lookup of single entries) you could use a NoSQL db (eg
> key value store) - a relational database will not scale.
>
>
>
> Depending on when you need to do the enrichment you could also first store
> the data and enrich it later as part of a batch process.
>
>
> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi,
>
>
>
> We are using Flink for financial data enrichment and aggregations. We have
> Positions data that we are currently receiving from Kafka. We want to
> enrich that data with reference data like Product and Account information
> that is present in a relational database. From my understanding of Flink so
> far I think there are two ways 

Re: Implement Joins with Lookup Data

2018-07-24 Thread ashish pok
BTW, 
We got around bootstrap problem for similar use case using a “nohup” topic as 
input stream. Our CICD pipeline currently passes an initialize option to app IF 
there is a need to bootstrap and waits for X minutes before taking a savepoint 
and restart app normally listening to right topic(s). I believe there is work 
underway to handle this gracefully using Side Input as well. Other than 
determining X minutes for initialization to complete, we havent had any issue 
with this solution - we have over 40 million states refreshes daily and close 
to 200Mbps input streams being joined to states.
Hope this helps!


- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy  
wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not 
allow you to pause a source (the positions), so you can't fully consume the and 
preload the accounts or products to perform the join before the positions start 
flowing.  Additionally, Flink SQL does not support materializing an upset table 
for the accounts or products to perform the join, so yo have to develop your 
own KeyedProcessFunction, maintain the state, and perform the join on your own 
if you only want to join against the latest value for each key.
On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann  wrote:

Yes, using Kafka which you initialize with the initial values and then feed 
changes to the Kafka topic from which you consume could be a solution.
On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal 
 wrote:

Hi Till,
How would we do the initial hydration of the Product and Account data since 
it’s currently in a relational DB? Do we have to copy over data to Kafka and 
then use them? 
Regards,Harsh
On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:

Hi Harshvardhan,
I agree with Ankit that this problem could actually be solved quite elegantly 
with Flink's state. If you can ingest the product/account information changes 
as a stream, you can keep the latest version of it in Flink state by using a 
co-map function [1, 2]. One input of the co-map function would be the 
product/account update stream which updates the respective entries in Flink's 
state and the other input stream is the one to be enriched. When receiving 
input from this stream one would lookup the latest information contained in the 
operator's state and join it with the incoming event.
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/[2] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
Cheers,Till
On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal 
 wrote:

Hi,
Thanks for your responses.
There is no fixed interval for the data being updated. It’s more like whenever 
you onboard a new product or there are any mandates that change will trigger 
the reference data to change.
It’s not just the enrichment we are doing here. Once we have enriched the data 
we will be performing a bunch of aggregations using the enriched data. 
Which approach would you recommend?
Regards,Harshvardhan
On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:


How often is the product db updated? Based on that you can store product 
metadata as state in Flink, maybe setup the state on cluster startup and then 
update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on 
top of Kafka. As Jorn said below, you can very well store all the events in an 
external store and then periodically run a cron to enrich later since your 
processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke 
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal 
Cc: 
Subject: Re: Implement Joins with Lookup Data

 

For the first one (lookup of single entries) you could use a NoSQL db (eg key 
value store) - a relational database will not scale.

 

Depending on when you need to do the enrichment you could also first store the 
data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal  
wrote:


Hi,

 

We are using Flink for financial data enrichment and aggregations. We have 
Positions data that we are currently receiving from Kafka. We want to enrich 
that data with reference data like Product and Account information that is 
present in a relational database. From my understanding of Flink so far I think 
there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain 
Tuple2

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the 
solution is very chatty. Its hard to scale this cos 

LoggingFactory: Access to static method across operators

2018-07-24 Thread Jayant Ameta
I am using a custom LoggingFactory. Is there a way to provide access to
this custom LoggingFactory to all the operators other than adding it to all
constructors?

This is somewhat related to:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Adding-Context-To-Logs-td7351.html


Jayant


Re: Implement Joins with Lookup Data

2018-07-24 Thread Elias Levy
Alas, this suffer from the bootstrap problem.  At the moment Flink does not
allow you to pause a source (the positions), so you can't fully consume the
and preload the accounts or products to perform the join before the
positions start flowing.  Additionally, Flink SQL does not support
materializing an upset table for the accounts or products to perform the
join, so yo have to develop your own KeyedProcessFunction, maintain the
state, and perform the join on your own if you only want to join against
the latest value for each key.

On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann  wrote:

> Yes, using Kafka which you initialize with the initial values and then
> feed changes to the Kafka topic from which you consume could be a solution.
>
> On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
>> Hi Till,
>>
>> How would we do the initial hydration of the Product and Account data
>> since it’s currently in a relational DB? Do we have to copy over data to
>> Kafka and then use them?
>>
>> Regards,
>> Harsh
>>
>> On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:
>>
>>> Hi Harshvardhan,
>>>
>>> I agree with Ankit that this problem could actually be solved quite
>>> elegantly with Flink's state. If you can ingest the product/account
>>> information changes as a stream, you can keep the latest version of it in
>>> Flink state by using a co-map function [1, 2]. One input of the co-map
>>> function would be the product/account update stream which updates the
>>> respective entries in Flink's state and the other input stream is the one
>>> to be enriched. When receiving input from this stream one would lookup the
>>> latest information contained in the operator's state and join it with the
>>> incoming event.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <
>>> harshvardhan.ag...@gmail.com> wrote:
>>>
 Hi,

 Thanks for your responses.

 There is no fixed interval for the data being updated. It’s more like
 whenever you onboard a new product or there are any mandates that change
 will trigger the reference data to change.

 It’s not just the enrichment we are doing here. Once we have enriched
 the data we will be performing a bunch of aggregations using the enriched
 data.

 Which approach would you recommend?

 Regards,
 Harshvardhan

 On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:

> How often is the product db updated? Based on that you can store
> product metadata as state in Flink, maybe setup the state on cluster
> startup and then update daily etc.
>
>
>
> Also, just based on this feature, flink doesn’t seem to add a lot of
> value on top of Kafka. As Jorn said below, you can very well store all the
> events in an external store and then periodically run a cron to enrich
> later since your processing doesn’t seem to require absolute real time.
>
>
>
> Thanks
>
> Ankit
>
>
>
> *From: *Jörn Franke 
> *Date: *Monday, July 23, 2018 at 10:10 PM
> *To: *Harshvardhan Agrawal 
> *Cc: *
> *Subject: *Re: Implement Joins with Lookup Data
>
>
>
> For the first one (lookup of single entries) you could use a NoSQL db
> (eg key value store) - a relational database will not scale.
>
>
>
> Depending on when you need to do the enrichment you could also first
> store the data and enrich it later as part of a batch process.
>
>
> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi,
>
>
>
> We are using Flink for financial data enrichment and aggregations. We
> have Positions data that we are currently receiving from Kafka. We want to
> enrich that data with reference data like Product and Account information
> that is present in a relational database. From my understanding of Flink 
> so
> far I think there are two ways to achieve this. Here are two ways to do 
> it:
>
>
>
> 1) First Approach:
>
> a) Get positions from Kafka and key by product key.
>
> b) Perform lookup from the database for each key and then obtain
> Tuple2
>
>
>
> 2) Second Approach:
>
> a) Get positions from Kafka and key by product key.
>
> b) Window the keyed stream into say 15 seconds each.
>
> c) For each window get the unique product keys and perform a single
> lookup.
>
> d) Somehow join Positions and Products
>
>
>
> In the first approach we will be making a lot of calls to the DB and
> the solution is very chatty. Its 

Re: Flink 1.5 batch job fails to start

2018-07-24 Thread vino yang
Hi Alex,

Based on your log information, the potential reason is Hadoop version. To
troubleshoot the exception comes from different Hadoop version. I suggest
you match the both side of Hadoop version.

You can :

1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's
official website provides the binary binding Hadoop 2.8.[1]
2. downgrade your fat jar's Hadoop client dependency's version to match
Flink Cluster's hadoop dependency's version.

[1]:
http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz

Thanks, vino.

2018-07-24 22:59 GMT+08:00 Alex Vinnik :

> Hi Till,
>
> Thanks for responding. Below is entrypoint logs. One thing I noticed that
> "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could
> it be a reason for that error? If so how can i use same hadoop version 2.8
> on flink server side?  BTW job runs fine locally reading from the same s3a
> buckets when executed using createLocalEnvironment via java -jar my-fat.jar
> --input s3a://foo --output s3a://bar
>
> Regarding java version. The job is submitted via Flink UI, so it should
> not be a problem.
>
> Thanks a lot in advance.
>
> 2018-07-24T12:09:38.083+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  
> 
> 2018-07-24T12:09:38.085+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0,
> Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
> 2018-07-24T12:09:38.085+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   OS current user: flink
> 2018-07-24T12:09:38.844+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Current Hadoop/Kerberos user: flink
> 2018-07-24T12:09:38.844+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
> 2018-07-24T12:09:38.844+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Maximum heap size: 1963 MiBytes
> 2018-07-24T12:09:38.844+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   JAVA_HOME: /docker-java-home/jre
> 2018-07-24T12:09:38.851+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Hadoop version: 2.7.3
> 2018-07-24T12:09:38.851+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   JVM Options:
> 2018-07-24T12:09:38.851+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  -Xms2048m
> 2018-07-24T12:09:38.851+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  -Xmx2048m
> 2018-07-24T12:09:38.851+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.
> disableCertChecking
> 2018-07-24T12:09:38.851+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  -Dcom.amazonaws.sdk.disableCertChecking
> 2018-07-24T12:09:38.851+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,
> address=5015
> 2018-07-24T12:09:38.851+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.
> properties
> 2018-07-24T12:09:38.851+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  -Dlogback.configurationFile=file:/opt/flink/conf/logback-
> console.xml
> 2018-07-24T12:09:38.851+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Program Arguments:
> 2018-07-24T12:09:38.852+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  --configDir
> 2018-07-24T12:09:38.852+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  /opt/flink/conf
> 2018-07-24T12:09:38.852+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  --executionMode
> 2018-07-24T12:09:38.853+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  cluster
> 2018-07-24T12:09:38.853+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  --host
> 2018-07-24T12:09:38.853+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  cluster
> 2018-07-24T12:09:38.853+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO   Classpath: /opt/flink/lib/flink-metrics-
> datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.
> jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/
> flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/
> flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.
> 7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
> 2018-07-24T12:09:38.853+ 
> [org.apache.flink.runtime.entrypoint.ClusterEntrypoint]
> INFO  
> 
> 2018-07-24T12:09:38.854+ 
> 

Re: Flink 1.5 batch job fails to start

2018-07-24 Thread Alex Vinnik
Hi Till,

Thanks for responding. Below is entrypoint logs. One thing I noticed that
"Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could
it be a reason for that error? If so how can i use same hadoop version 2.8
on flink server side?  BTW job runs fine locally reading from the same s3a
buckets when executed using createLocalEnvironment via java -jar my-fat.jar
--input s3a://foo --output s3a://bar

Regarding java version. The job is submitted via Flink UI, so it should not
be a problem.

Thanks a lot in advance.

2018-07-24T12:09:38.083+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO

2018-07-24T12:09:38.085+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting
StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108,
Date:24.05.2018 @ 14:54:44 UTC)
2018-07-24T12:09:38.085+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current
user: flink
2018-07-24T12:09:38.844+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current
Hadoop/Kerberos user: flink
2018-07-24T12:09:38.844+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK
64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-24T12:09:38.844+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap
size: 1963 MiBytes
2018-07-24T12:09:38.844+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME:
/docker-java-home/jre
2018-07-24T12:09:38.851+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop
version: 2.7.3
2018-07-24T12:09:38.851+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
2018-07-24T12:09:38.851+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  -Xms2048m
2018-07-24T12:09:38.851+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  -Xmx2048m
2018-07-24T12:09:38.851+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dcom.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-24T12:09:38.851+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-24T12:09:38.851+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-24T12:09:38.851+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program
Arguments:
2018-07-24T12:09:38.852+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--configDir
2018-07-24T12:09:38.852+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
/opt/flink/conf
2018-07-24T12:09:38.852+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO
--executionMode
2018-07-24T12:09:38.853+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  cluster
2018-07-24T12:09:38.853+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --host
2018-07-24T12:09:38.853+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  cluster
2018-07-24T12:09:38.853+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath:
/opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-24T12:09:38.853+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO

2018-07-24T12:09:38.854+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered
UNIX signal handlers for [TERM, HUP, INT]
2018-07-24T12:09:38.895+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting
StandaloneSessionClusterEntrypoint.
2018-07-24T12:09:38.895+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
default filesystem.
2018-07-24T12:09:38.927+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install
security context.
2018-07-24T12:09:39.034+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing
cluster services.
2018-07-24T12:09:39.059+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to
start actor system at flink-jobmanager:6123
2018-07-24T12:09:40.335+
[org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system
started at 

Re: Implement Joins with Lookup Data

2018-07-24 Thread Till Rohrmann
Yes, using Kafka which you initialize with the initial values and then feed
changes to the Kafka topic from which you consume could be a solution.

On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal <
harshvardhan.ag...@gmail.com> wrote:

> Hi Till,
>
> How would we do the initial hydration of the Product and Account data
> since it’s currently in a relational DB? Do we have to copy over data to
> Kafka and then use them?
>
> Regards,
> Harsh
>
> On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:
>
>> Hi Harshvardhan,
>>
>> I agree with Ankit that this problem could actually be solved quite
>> elegantly with Flink's state. If you can ingest the product/account
>> information changes as a stream, you can keep the latest version of it in
>> Flink state by using a co-map function [1, 2]. One input of the co-map
>> function would be the product/account update stream which updates the
>> respective entries in Flink's state and the other input stream is the one
>> to be enriched. When receiving input from this stream one would lookup the
>> latest information contained in the operator's state and join it with the
>> incoming event.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
>>
>> Cheers,
>> Till
>>
>> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <
>> harshvardhan.ag...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Thanks for your responses.
>>>
>>> There is no fixed interval for the data being updated. It’s more like
>>> whenever you onboard a new product or there are any mandates that change
>>> will trigger the reference data to change.
>>>
>>> It’s not just the enrichment we are doing here. Once we have enriched
>>> the data we will be performing a bunch of aggregations using the enriched
>>> data.
>>>
>>> Which approach would you recommend?
>>>
>>> Regards,
>>> Harshvardhan
>>>
>>> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:
>>>
 How often is the product db updated? Based on that you can store
 product metadata as state in Flink, maybe setup the state on cluster
 startup and then update daily etc.



 Also, just based on this feature, flink doesn’t seem to add a lot of
 value on top of Kafka. As Jorn said below, you can very well store all the
 events in an external store and then periodically run a cron to enrich
 later since your processing doesn’t seem to require absolute real time.



 Thanks

 Ankit



 *From: *Jörn Franke 
 *Date: *Monday, July 23, 2018 at 10:10 PM
 *To: *Harshvardhan Agrawal 
 *Cc: *
 *Subject: *Re: Implement Joins with Lookup Data



 For the first one (lookup of single entries) you could use a NoSQL db
 (eg key value store) - a relational database will not scale.



 Depending on when you need to do the enrichment you could also first
 store the data and enrich it later as part of a batch process.


 On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
 harshvardhan.ag...@gmail.com> wrote:

 Hi,



 We are using Flink for financial data enrichment and aggregations. We
 have Positions data that we are currently receiving from Kafka. We want to
 enrich that data with reference data like Product and Account information
 that is present in a relational database. From my understanding of Flink so
 far I think there are two ways to achieve this. Here are two ways to do it:



 1) First Approach:

 a) Get positions from Kafka and key by product key.

 b) Perform lookup from the database for each key and then obtain
 Tuple2



 2) Second Approach:

 a) Get positions from Kafka and key by product key.

 b) Window the keyed stream into say 15 seconds each.

 c) For each window get the unique product keys and perform a single
 lookup.

 d) Somehow join Positions and Products



 In the first approach we will be making a lot of calls to the DB and
 the solution is very chatty. Its hard to scale this cos the database
 storing the reference data might not be very responsive.



 In the second approach, I wish to join the WindowedStream with the
 SingleOutputStream and turns out I can't join a windowed stream. So I am
 not quite sure how to do that.



 I wanted an opinion for what is the right thing to do. Should I go with
 the first approach or the second one. If the second one, how can I
 implement the join?



 --


 *Regards, Harshvardhan Agrawal*

 --
>>> Regards,
>>> Harshvardhan
>>>
>> --
> Regards,
> Harshvardhan
>


Re: Implement Joins with Lookup Data

2018-07-24 Thread Harshvardhan Agrawal
Hi Till,

How would we do the initial hydration of the Product and Account data since
it’s currently in a relational DB? Do we have to copy over data to Kafka
and then use them?

Regards,
Harsh

On Tue, Jul 24, 2018 at 09:22 Till Rohrmann  wrote:

> Hi Harshvardhan,
>
> I agree with Ankit that this problem could actually be solved quite
> elegantly with Flink's state. If you can ingest the product/account
> information changes as a stream, you can keep the latest version of it in
> Flink state by using a co-map function [1, 2]. One input of the co-map
> function would be the product/account update stream which updates the
> respective entries in Flink's state and the other input stream is the one
> to be enriched. When receiving input from this stream one would lookup the
> latest information contained in the operator's state and join it with the
> incoming event.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
>
> Cheers,
> Till
>
> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
>> Hi,
>>
>> Thanks for your responses.
>>
>> There is no fixed interval for the data being updated. It’s more like
>> whenever you onboard a new product or there are any mandates that change
>> will trigger the reference data to change.
>>
>> It’s not just the enrichment we are doing here. Once we have enriched the
>> data we will be performing a bunch of aggregations using the enriched data.
>>
>> Which approach would you recommend?
>>
>> Regards,
>> Harshvardhan
>>
>> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:
>>
>>> How often is the product db updated? Based on that you can store product
>>> metadata as state in Flink, maybe setup the state on cluster startup and
>>> then update daily etc.
>>>
>>>
>>>
>>> Also, just based on this feature, flink doesn’t seem to add a lot of
>>> value on top of Kafka. As Jorn said below, you can very well store all the
>>> events in an external store and then periodically run a cron to enrich
>>> later since your processing doesn’t seem to require absolute real time.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Ankit
>>>
>>>
>>>
>>> *From: *Jörn Franke 
>>> *Date: *Monday, July 23, 2018 at 10:10 PM
>>> *To: *Harshvardhan Agrawal 
>>> *Cc: *
>>> *Subject: *Re: Implement Joins with Lookup Data
>>>
>>>
>>>
>>> For the first one (lookup of single entries) you could use a NoSQL db
>>> (eg key value store) - a relational database will not scale.
>>>
>>>
>>>
>>> Depending on when you need to do the enrichment you could also first
>>> store the data and enrich it later as part of a batch process.
>>>
>>>
>>> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
>>> harshvardhan.ag...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>>
>>>
>>> We are using Flink for financial data enrichment and aggregations. We
>>> have Positions data that we are currently receiving from Kafka. We want to
>>> enrich that data with reference data like Product and Account information
>>> that is present in a relational database. From my understanding of Flink so
>>> far I think there are two ways to achieve this. Here are two ways to do it:
>>>
>>>
>>>
>>> 1) First Approach:
>>>
>>> a) Get positions from Kafka and key by product key.
>>>
>>> b) Perform lookup from the database for each key and then obtain
>>> Tuple2
>>>
>>>
>>>
>>> 2) Second Approach:
>>>
>>> a) Get positions from Kafka and key by product key.
>>>
>>> b) Window the keyed stream into say 15 seconds each.
>>>
>>> c) For each window get the unique product keys and perform a single
>>> lookup.
>>>
>>> d) Somehow join Positions and Products
>>>
>>>
>>>
>>> In the first approach we will be making a lot of calls to the DB and the
>>> solution is very chatty. Its hard to scale this cos the database storing
>>> the reference data might not be very responsive.
>>>
>>>
>>>
>>> In the second approach, I wish to join the WindowedStream with the
>>> SingleOutputStream and turns out I can't join a windowed stream. So I am
>>> not quite sure how to do that.
>>>
>>>
>>>
>>> I wanted an opinion for what is the right thing to do. Should I go with
>>> the first approach or the second one. If the second one, how can I
>>> implement the join?
>>>
>>>
>>>
>>> --
>>>
>>>
>>> *Regards, Harshvardhan Agrawal*
>>>
>>> --
>> Regards,
>> Harshvardhan
>>
> --
Regards,
Harshvardhan


[ANNOUNCE] Weekly community update #30

2018-07-24 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #30. Please post any news and
updates you want to share with the community to this thread.

# First RC for Flink 1.6.0

The community is published the first release candidate for Flink 1.6.0 [1].
Please help the community by trying the RC out and reporting potential
problems with it.

# First RC for Flink 1.5.2

The community is published the first release candidate for Flink 1.5.2 [2].
This minor release contains several bug fixes which you can find here [3].
The community highly appreciates if you could try this RC out.

# Program for Flink Forward Berlin 2018

The program for Flink Forward Berlin 2018 has been announced [4]. As every
year, Flink Forward Berlin is a great opportunity for the community to meet
each other in person, learn new things about Flink and its use cases and to
discuss the project's future direction.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-6-0-release-candidate-1-tp23440.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-5-2-release-candidate-1-tp23446.html
[3]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12343588
[4] https://berlin-2018.flink-forward.org/conference-program

Cheers,
Till


Re: Implement Joins with Lookup Data

2018-07-24 Thread Till Rohrmann
Hi Harshvardhan,

I agree with Ankit that this problem could actually be solved quite
elegantly with Flink's state. If you can ingest the product/account
information changes as a stream, you can keep the latest version of it in
Flink state by using a co-map function [1, 2]. One input of the co-map
function would be the product/account update stream which updates the
respective entries in Flink's state and the other input stream is the one
to be enriched. When receiving input from this stream one would lookup the
latest information contained in the operator's state and join it with the
incoming event.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/
[2]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html

Cheers,
Till

On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal <
harshvardhan.ag...@gmail.com> wrote:

> Hi,
>
> Thanks for your responses.
>
> There is no fixed interval for the data being updated. It’s more like
> whenever you onboard a new product or there are any mandates that change
> will trigger the reference data to change.
>
> It’s not just the enrichment we are doing here. Once we have enriched the
> data we will be performing a bunch of aggregations using the enriched data.
>
> Which approach would you recommend?
>
> Regards,
> Harshvardhan
>
> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:
>
>> How often is the product db updated? Based on that you can store product
>> metadata as state in Flink, maybe setup the state on cluster startup and
>> then update daily etc.
>>
>>
>>
>> Also, just based on this feature, flink doesn’t seem to add a lot of
>> value on top of Kafka. As Jorn said below, you can very well store all the
>> events in an external store and then periodically run a cron to enrich
>> later since your processing doesn’t seem to require absolute real time.
>>
>>
>>
>> Thanks
>>
>> Ankit
>>
>>
>>
>> *From: *Jörn Franke 
>> *Date: *Monday, July 23, 2018 at 10:10 PM
>> *To: *Harshvardhan Agrawal 
>> *Cc: *
>> *Subject: *Re: Implement Joins with Lookup Data
>>
>>
>>
>> For the first one (lookup of single entries) you could use a NoSQL db (eg
>> key value store) - a relational database will not scale.
>>
>>
>>
>> Depending on when you need to do the enrichment you could also first
>> store the data and enrich it later as part of a batch process.
>>
>>
>> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
>> harshvardhan.ag...@gmail.com> wrote:
>>
>> Hi,
>>
>>
>>
>> We are using Flink for financial data enrichment and aggregations. We
>> have Positions data that we are currently receiving from Kafka. We want to
>> enrich that data with reference data like Product and Account information
>> that is present in a relational database. From my understanding of Flink so
>> far I think there are two ways to achieve this. Here are two ways to do it:
>>
>>
>>
>> 1) First Approach:
>>
>> a) Get positions from Kafka and key by product key.
>>
>> b) Perform lookup from the database for each key and then obtain
>> Tuple2
>>
>>
>>
>> 2) Second Approach:
>>
>> a) Get positions from Kafka and key by product key.
>>
>> b) Window the keyed stream into say 15 seconds each.
>>
>> c) For each window get the unique product keys and perform a single
>> lookup.
>>
>> d) Somehow join Positions and Products
>>
>>
>>
>> In the first approach we will be making a lot of calls to the DB and the
>> solution is very chatty. Its hard to scale this cos the database storing
>> the reference data might not be very responsive.
>>
>>
>>
>> In the second approach, I wish to join the WindowedStream with the
>> SingleOutputStream and turns out I can't join a windowed stream. So I am
>> not quite sure how to do that.
>>
>>
>>
>> I wanted an opinion for what is the right thing to do. Should I go with
>> the first approach or the second one. If the second one, how can I
>> implement the join?
>>
>>
>>
>> --
>>
>>
>> *Regards, Harshvardhan Agrawal*
>>
>> --
> Regards,
> Harshvardhan
>


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Fabian Hueske
Hi,

The problem is that Flink tracks which files it has read by remembering the
modification time of the file that was added (or modified) last.
We use the modification time, to avoid that we have to remember the names
of all files that were ever consumed, which would be expensive to check and
store over time.

One could change this logic to a hybrid approach that keeps the names of
all files that have a mod timestamp that is larger than the max mod time
minus an offset.

It would be great if you could open a Jira issue for this problem.

Thanks, Fabian


2018-07-24 14:58 GMT+02:00 Averell :

> Hello Jörn.
>
> Thanks for your help.
> "/Probably the system is putting them to the folder and Flink is triggered
> before they are consistent./" <<< yes, I also guess so. However, if Flink
> is
> triggered before they are consistent, either (a) there should be some error
> messages, or (b) Flink should be able to identify those files in the
> subsequent triggers. But in my case, those files are missed forever.
>
> Right now those files for S3 are to be consumed by Flink only. The flow is
> as follow:
>Existing system >>> S3 >>> Flink >>> Elastic Search.
> If I cannot find a solution to the mentioned problem, I might need to
> change
> to:
>Existing system >>> Kinesis >>> Flink >>> Elastic Search
> Or
>Existing system >>> S3 >>> Kinesis >>> Flink >>> Elastic
> Search
> Or
>Existing system >>> S3 >>> Custom File Source + Flink >>>
> Elastic
> Search
> However, all those solutions would take much more effort.
>
> Thanks!
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Averell
Hello Jörn.

Thanks for your help.
"/Probably the system is putting them to the folder and Flink is triggered
before they are consistent./" <<< yes, I also guess so. However, if Flink is
triggered before they are consistent, either (a) there should be some error
messages, or (b) Flink should be able to identify those files in the
subsequent triggers. But in my case, those files are missed forever.

Right now those files for S3 are to be consumed by Flink only. The flow is
as follow:
   Existing system >>> S3 >>> Flink >>> Elastic Search.
If I cannot find a solution to the mentioned problem, I might need to change
to:
   Existing system >>> Kinesis >>> Flink >>> Elastic Search
Or
   Existing system >>> S3 >>> Kinesis >>> Flink >>> Elastic Search  
Or
   Existing system >>> S3 >>> Custom File Source + Flink >>> Elastic
Search
However, all those solutions would take much more effort.

Thanks!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Memory Logging

2018-07-24 Thread Till Rohrmann
Hi Oliver,

which Flink image are you using? If you are using the docker image from
docker hub [1], then the memory logging will go to stdout and not to a log
file. The reason for this behavior is that the docker image configures the
logger to print to stdout such that one can easily access the logs via
`docker logs`. If this is the case, then you should find the memory logging
statements somewhere in the console output.

[1] https://hub.docker.com/r/_/flink/

Cheers,
Till

On Tue, Jul 24, 2018 at 1:43 PM Oliver Breit  wrote:

> Hi everyone,
>
> We are using a simple Flink setup with one jobmanager and one taskmanager
> running inside a docker container. We are having issues enabling the 
> *taskmanager.debug.memory.startLogThread
> *setting. We added
> *taskmanager.debug.memory.startLogThread: true*
> *taskmanager.debug.memory.logIntervalMs: 1000*
> to our flink conf (/opt/flink/conf/flink-conf.yaml).
>
> The console prints
> *2018-07-24 09:53:42,497 INFO
> org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.debug.memory.startLogThread, true*
> (similarly for logIntervalMs). So it seems that those values are being
> read.
>
> In the log folder, there is only a flink--client-bc7592c47b7b.log file
> with no relevant information. We don't see any task- or jobmanager logs.
> We've tried adding the env.log.dir and taskmanager.log.path directly to
> flink-conf.yaml. We've also added custom log4j.properties to
> /opt/flink/conf/ without any apparent success.
>
> Is there something obvious that I am missing?
>
> Let me know if you need more information.
>
> Thanks!
>
> Best,
> Oliver
>
>
>


Re: SingleOutputStreamOperator vs DataStream?

2018-07-24 Thread Till Rohrmann
Hi Chris,

a `DataStream` represents a stream of events which have the same type. A
`SingleOutputStreamOperator` is a subclass of `DataStream` and represents a
user defined transformation applied to an input `DataStream` and producing
an output `DataStream` (represented by itself). Since you can only add a
side output to an operator/user defined transformation, you can only access
the side output data from a `SingleOutputStreamOperator` and not from a
`DataStream`. In this regard, the `SingleOutputStreamOperator` is just a
richer version of the `DataStream` which requires a certain context.

Cheers,
Till

On Tue, Jul 24, 2018 at 1:26 PM chrisr123  wrote:

>
> I'm trying to get a list of late elements in my Tumbling Windows
> application
> and I noticed
> that I need to use SingleOutputStreamOperator instead of DataStream
> to
> get
> access to the .sideOutputLateData(...) method.
>
> Can someone explain what the difference is between
> SingleOutputStreamOperator and DataStream
> and why I need to use this for getting the late data?
> Thanks!
>
> Snippet:
>  OutputTag lateEventsTag = new
> OutputTag("late-events") {};
>   SingleOutputStreamOperator windowedEvents = eventStream
> .keyBy("key")
>
> .window(TumblingEventTimeWindows.of(Time.seconds(3)))
> .sideOutputLateData(lateEventsTag)
> .process(new
> EventBeanProcessWindowFunction());
> DataStream lateEvents =
> windowedEvents.getSideOutput(lateEventsTag);
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Questions on Unbounded number of keys

2018-07-24 Thread Till Rohrmann
Hi Chang Liu,

if you are dealing with an unlimited number of keys and keep state around
for every key, then your state size will keep growing with the number of
keys. If you are using the FileStateBackend which keeps state in memory,
you will eventually run into an OutOfMemoryException. One way to
solve/mitigate this problem is to use the RocksDBStateBackend which can go
out of core.

Alternatively, you would need to clean up your state before you run out of
memory. One way to do this is to register for every key a timer which
clears the state. But this only works if you don't amass too much state
data before the timer is triggered. If you wish this solution is some kind
of a poor man's state TTL. The Flink community is currently developing a
proper implementation of it which does not rely on additional timers (which
increases the state footprint) [1].

[1] https://issues.apache.org/jira/browse/FLINK-9510

Cheers,
Till

On Tue, Jul 24, 2018 at 10:11 AM Chang Liu  wrote:

> Dear All,
>
> I have questions regarding the keys. In general, the questions are:
>
>- what happens if I am doing keyBy based on unlimited number of keys?
>How Flink is managing each KeyedStream under the hood? Will I get memory
>overflow, for example, if every KeyStream associated with a specific key is
>taking certain amount of memory?
>- BTW, I think it is fare to say that, I have to clear my KeyedState
>so that the memory used by these State are cleaned up regularly. But still,
>I am wondering, even though I am regularly cleaning up State memory, what
>happened to memory used by the KeyedStream itself, if there is? And will
>they be exploding?
>
>
> Let me give an example for understanding it clearly.  Let’s say we have a
>
> val requestStream: DataStream[HttpRequest]
>
> which is a stream of HTTP requests. And by using the session ID as the
> key, we can obtain a KeyedStream per single session, as following:
>
> val streamPerSession: KeyedStream[HttpRequest] =
> requestStream.keyBy(_.sessionId)
>
> However, the session IDs are actually a hashcode generated randomly by the
> Web service/application, so that means, the number of sessions are
> unlimited (which is reasonable, because every time a user open the
> application or login, he/she will get a new unique session).
>
> Then, the question is: will Flink eventually run out of memory because the
> number of sessions are unlimited (and because we are keying by the session
> ID)?
>
>- If so, how can we properly manage this situation?
>- If not, could you help me understand WHY?
>- Let’s also assume that, we are regularly clearing the KeyedState, so
>the memory used by the State will not explode.
>
>
>
> Many Thanks and Looking forward to your reply :)
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
>


Re: streaming predictions

2018-07-24 Thread Andrea Spina
Dear Cederic,
I did something similar as yours a while ago along this work [1] but I've
always been working within the batch context. I'm also the co-author of
flink-jpmml and, since a flink2pmml model saver library doesn't exist
currently, I'd suggest you a twofold strategy to tackle this problem:
- if your model is relatively simple, take the batch evaluate method (it
belongs to your SVM classifier) and attempt to translate it in a flatMap
function (hopefully you can reuse some internal utilities, Flink exploits
breeze vector library under the hoods [3]).
- if your model is a complex one, you should export the model into PMML and
employ then [2]. For a first overview, this [4] is the library you should
adopt as to export your model and this [5] can help you with the related
implementation.

Hope it can help and good luck!

Andrea

[1] https://dl.acm.org/citation.cfm?id=3070612
[2] https://github.com/FlinkML/flink-jpmml
[3]
https://github.com/apache/flink/blob/7034e9cfcb051ef90c5bf0960bfb50a79b3723f0/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L73
[4] https://github.com/jpmml/jpmml-model
[5] https://github.com/jpmml/jpmml-sparkml

2018-07-24 13:29 GMT+02:00 David Anderson :

> One option (which I haven't tried myself) would be to somehow get the
> model into PMML format, and then use https://github.com/
> FlinkML/flink-jpmml to score the model. You could either use another
> machine learning framework to train the model (i.e., a framework that
> directly supports PMML export), or convert the Flink model into PMML. Since
> SVMs are fairly simple to describe, that might not be terribly difficult.
>
> On Mon, Jul 23, 2018 at 4:18 AM Xingcan Cui  wrote:
>
>> Hi Cederic,
>>
>> If the model is a simple function, you can just load it and make
>> predictions using the map/flatMap function in the StreamEnvironment.
>>
>> But I’m afraid the model trained by Flink-ML should be a “batch job",
>> whose predict method takes a Dataset as the parameter and outputs another
>> Dataset as the result. That means you cannot easily apply the model on
>> streams, at least for now.
>>
>> There are two options to solve this. (1) Train the dataset using another
>> framework to produce a simple function. (2) Adjust your model serving as a
>> series of batch jobs.
>>
>> Hope that helps,
>> Xingcan
>>
>> On Jul 22, 2018, at 8:56 PM, Hequn Cheng  wrote:
>>
>> Hi Cederic,
>>
>> I am not familiar with SVM or machine learning but I think we can work it
>> out together.
>> What problem have you met when you try to implement this function? From
>> my point of view, we can rebuild the model in the flatMap function and use
>> it to predict the input data. There are some flatMap documents here[1].
>>
>> Best, Hequn
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-
>> master/dev/stream/operators/#datastream-transformations
>>
>>
>>
>>
>>
>> On Sun, Jul 22, 2018 at 4:12 PM, Cederic Bosmans 
>> wrote:
>>
>>> Dear
>>>
>>> My name is Cederic Bosmans and I am a masters student at the Ghent
>>> University (Belgium).
>>> I am currently working on my masters dissertation which involves Apache
>>> Flink.
>>>
>>> I want to make predictions in the streaming environment based on a model
>>> trained in the batch environment.
>>>
>>> I trained my SVM-model this way:
>>> val svm2 = SVM()
>>> svm2.setSeed(1)
>>> svm2.fit(trainLV)
>>> val testVD = testLV.map(lv => (lv.vector, lv.label))
>>> val evalSet = svm2.evaluate(testVD)
>>>
>>> and saved the model:
>>> val modelSvm = svm2.weightsOption.get
>>>
>>> Then I have an incoming datastream in the streaming environment:
>>> dataStream[(Int, Int, Int)]
>>> which should be bininary classified using this trained SVM model.
>>>
>>> Since the predict function does only support DataSet and not DataStream,
>>> on stackoverflow a flink contributor mentioned that this should be done
>>> using a map/flatMap function.
>>> Unfortunately I am not able to work this function out.
>>>
>>> It would be incredible for me if you could help me a little bit further!
>>>
>>> Kind regards and thanks in advance
>>> Cederic Bosmans
>>>
>>
>>
>>
>
> --
> *David Anderson* | Training Coordinator | data Artisans
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
>



-- 
*Andrea Spina*
Software Engineer @ Radicalbit Srl
Via Borsieri 41, 20159, Milano - IT


Re: NoClassDefFoundError when running Twitter Example

2018-07-24 Thread Till Rohrmann
Hi Syed,

could you check whether this class is actually contained in the twitter
example jar? If not, then you have to build an uber jar containing all
required dependencies.

Cheers,
Till

On Tue, Jul 24, 2018 at 5:11 AM syed  wrote:

> I am facing the *java.lang.NoClassDefFoundError:
> com/twitter/hbc/httpclient/auth/Authentication*
> error when running the tweeter example.
>
> The example works well with the sample data, but I am unable to run it with
> real tweet data.
> Please guide me how to fix this issue. I am running Flink 1.3.2.
> Thanks
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink 1.5 batch job fails to start

2018-07-24 Thread Till Rohrmann
Hi Alex,

I'm not entirely sure what causes this problem because it is the first time
I see it.

First question would be if the problem also arises if using a different
Hadoop version.

Are you using the same Java versions on the client as well as on the server?

Could you provide us with the cluster entrypoint logs?

Cheers,
Till

On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik  wrote:

> Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5
> and getting a weird exception.
>
> Job reads json from s3a and writes parquet files to s3a with avro model.
> Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to
> S3AFileSystem class.
>
> Fails here
>
> https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288
> with
> Caused by: java.lang.Exception: Deserializing the OutputFormat
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)
> failed: unread block data
>
> To be exact it fails right on that line.
>
> https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488
>
> Not sure how to resolve this problem. Looking for an advice. Let me know
> if more info is needed. Full stack is below. Thanks.
>
> org.apache.flink.runtime.rest.handler.RestHandlerException:
> org.apache.flink.util.FlinkException: Failed to submit job
> 13a1478cbc7ec20f93f9ee0947856bfd.
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: Failed to submit job
> 13a1478cbc7ec20f93f9ee0947856bfd.
> at
> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
> at
> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
> at
> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
> at
> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
> ... 29 more
> Caused by: org.apache.flink.util.FlinkException: Failed to submit job
> 13a1478cbc7ec20f93f9ee0947856bfd.
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> 

Re: Implement Joins with Lookup Data

2018-07-24 Thread Harshvardhan Agrawal
Hi,

Thanks for your responses.

There is no fixed interval for the data being updated. It’s more like
whenever you onboard a new product or there are any mandates that change
will trigger the reference data to change.

It’s not just the enrichment we are doing here. Once we have enriched the
data we will be performing a bunch of aggregations using the enriched data.

Which approach would you recommend?

Regards,
Harshvardhan

On Tue, Jul 24, 2018 at 04:04 Jain, Ankit  wrote:

> How often is the product db updated? Based on that you can store product
> metadata as state in Flink, maybe setup the state on cluster startup and
> then update daily etc.
>
>
>
> Also, just based on this feature, flink doesn’t seem to add a lot of value
> on top of Kafka. As Jorn said below, you can very well store all the events
> in an external store and then periodically run a cron to enrich later since
> your processing doesn’t seem to require absolute real time.
>
>
>
> Thanks
>
> Ankit
>
>
>
> *From: *Jörn Franke 
> *Date: *Monday, July 23, 2018 at 10:10 PM
> *To: *Harshvardhan Agrawal 
> *Cc: *
> *Subject: *Re: Implement Joins with Lookup Data
>
>
>
> For the first one (lookup of single entries) you could use a NoSQL db (eg
> key value store) - a relational database will not scale.
>
>
>
> Depending on when you need to do the enrichment you could also first store
> the data and enrich it later as part of a batch process.
>
>
> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <
> harshvardhan.ag...@gmail.com> wrote:
>
> Hi,
>
>
>
> We are using Flink for financial data enrichment and aggregations. We have
> Positions data that we are currently receiving from Kafka. We want to
> enrich that data with reference data like Product and Account information
> that is present in a relational database. From my understanding of Flink so
> far I think there are two ways to achieve this. Here are two ways to do it:
>
>
>
> 1) First Approach:
>
> a) Get positions from Kafka and key by product key.
>
> b) Perform lookup from the database for each key and then obtain
> Tuple2
>
>
>
> 2) Second Approach:
>
> a) Get positions from Kafka and key by product key.
>
> b) Window the keyed stream into say 15 seconds each.
>
> c) For each window get the unique product keys and perform a single lookup.
>
> d) Somehow join Positions and Products
>
>
>
> In the first approach we will be making a lot of calls to the DB and the
> solution is very chatty. Its hard to scale this cos the database storing
> the reference data might not be very responsive.
>
>
>
> In the second approach, I wish to join the WindowedStream with the
> SingleOutputStream and turns out I can't join a windowed stream. So I am
> not quite sure how to do that.
>
>
>
> I wanted an opinion for what is the right thing to do. Should I go with
> the first approach or the second one. If the second one, how can I
> implement the join?
>
>
>
> --
>
>
> *Regards, Harshvardhan Agrawal*
>
> --
Regards,
Harshvardhan


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Jörn Franke
https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html

You will find there a passage of the consistency model.

Probably the system is putting them to the folder and Flink is triggered before 
they are consistent.

What happens after Flink put s them on S3 ? Are they reused by another system ? 
Or is it just archival?

If they are reused then probably go for a nosql solution (eg Dynamo), if they 
are just archived then use kinesis + s3

> On 24. Jul 2018, at 11:52, Averell  wrote:
> 
> Could you please help explain more details on "/try read after write
> consistency (assuming the files are not modified) /"? 
> I guess that the problem I got comes from the inconsistency in S3 files
> listing. Otherwise, I would have got exceptions on file not found.
> 
> My use case is to read output files from another system. That system was
> built some years back, and is outputting files to their S3 bucket. There is
> no file modification, only new files are being created. We want to avoid
> modifying that system.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks

2018-07-24 Thread Till Rohrmann
Hi Gerard,

the first log snippet from the client does not show anything suspicious.
The warning just says that you cannot use the Yarn CLI because it lacks the
Hadoop dependencies in the classpath.

The second snippet is indeed more interesting. If the TaskExecutors are not
notified about the changed leader, then this might indicate a problem with
the ZooKeeper connection or the ZooKeeper cluster itself. This might also
explain why the job deletion from ZooKeeper does not succeed.

One thing you could check is whether the leader ZNode under
`/flink/default/leader/dispatcher_lock` (if you are using the defaults)
actually contains the address of the newly elected leader. The leader path
should also be logged in the cluster entrypoint logs. You can use the
ZooKeeper cli for accessing the ZNodes.

Cheers,
Till

On Mon, Jul 23, 2018 at 4:07 PM Gerard Garcia  wrote:

> We have just started experiencing a different problem that could be
> related, maybe it helps to diagnose the issue.
>
> In the last 24h the jobmanager lost connection to Zookeeper a couple of
> times. Each time, a new jobmanager (in a different node) was elected leader
> correctly but the taskamangers kept trying to connect to the old
> jobmanager. These are the ending log messages until the taskamanger shut
> down itself.
>
> 12:06:41.747 [flink-akka.actor.default-dispatcher-5] WARN
> akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote
> connection to [null] failed with java.net.ConnectException: Connection
> refused: (...)1/192.168.1.9:35605
> 12:06:41.748 [flink-akka.actor.default-dispatcher-2] INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor  - Could not resolve
> ResourceManager address akka.tcp://flink@(...)1:35605/user/resourcemanager,
> retrying in 1 ms: Could not connect to rpc endpoint under address
> akka.tcp://flink@(...)1:35605/user/resourcemanager..
> 12:06:41.748 [flink-akka.actor.default-dispatcher-5] WARN
> akka.remote.ReliableDeliverySupervisor
> flink-akka.remote.default-remote-dispatcher-15 - Association with remote
> system [akka.tcp://flink@(...)1:35605] has failed, address is now gated
> for [50] ms. Reason: [Association failed with [akka.tcp://flink@(...)1:35605]]
> Caused by: [Connection refused: (...)1/192.168.1.9:35605]
> 12:06:51.766 [flink-akka.actor.default-dispatcher-5] WARN
> akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote
> connection to [null] failed with java.net.ConnectException: Connection
> refused: (...)1/192.168.1.9:35605
> 12:06:51.767 [flink-akka.actor.default-dispatcher-2] INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor  - Could not resolve
> ResourceManager address akka.tcp://flink@(...)1:35605/user/resourcemanager,
> retrying in 1 ms: Could not connect to rpc endpoint under address
> akka.tcp://flink@(...)1:35605/user/resourcemanager..
> 12:06:51.767 [flink-akka.actor.default-dispatcher-5] WARN
> akka.remote.ReliableDeliverySupervisor
> flink-akka.remote.default-remote-dispatcher-7 - Association with remote
> system [akka.tcp://flink@(...)1:35605] has failed, address is now gated
> for [50] ms. Reason: [Association failed with [akka.tcp://flink@(...)1:35605]]
> Caused by: [Connection refused: (...)1/192.168.1.9:35605]
> 12:07:01.123 [flink-akka.actor.default-dispatcher-5] ERROR
> org.apache.flink.runtime.taskexecutor.TaskExecutor  - Fatal error occurred
> in TaskExecutor akka.tcp://flink@(...)2:33455/user/taskmanager_0.
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
> Could not register at the ResourceManager within the specified maximum
> registration duration 30 ms. This indicates a problem with this
> instance. Terminating now.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1018)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$3(TaskExecutor.java:1004)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> 

Memory Logging

2018-07-24 Thread Oliver Breit
Hi everyone,

We are using a simple Flink setup with one jobmanager and one taskmanager
running inside a docker container. We are having issues enabling the
*taskmanager.debug.memory.startLogThread
*setting. We added
*taskmanager.debug.memory.startLogThread: true*
*taskmanager.debug.memory.logIntervalMs: 1000*
to our flink conf (/opt/flink/conf/flink-conf.yaml).

The console prints
*2018-07-24 09:53:42,497 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.debug.memory.startLogThread, true*
(similarly for logIntervalMs). So it seems that those values are being read.

In the log folder, there is only a flink--client-bc7592c47b7b.log file with
no relevant information. We don't see any task- or jobmanager logs. We've
tried adding the env.log.dir and taskmanager.log.path directly to
flink-conf.yaml. We've also added custom log4j.properties to
/opt/flink/conf/ without any apparent success.

Is there something obvious that I am missing?

Let me know if you need more information.

Thanks!

Best,
Oliver


Re: streaming predictions

2018-07-24 Thread David Anderson
One option (which I haven't tried myself) would be to somehow get the model
into PMML format, and then use https://github.com/FlinkML/flink-jpmml to
score the model. You could either use another machine learning framework to
train the model (i.e., a framework that directly supports PMML export), or
convert the Flink model into PMML. Since SVMs are fairly simple to
describe, that might not be terribly difficult.

On Mon, Jul 23, 2018 at 4:18 AM Xingcan Cui  wrote:

> Hi Cederic,
>
> If the model is a simple function, you can just load it and make
> predictions using the map/flatMap function in the StreamEnvironment.
>
> But I’m afraid the model trained by Flink-ML should be a “batch job",
> whose predict method takes a Dataset as the parameter and outputs another
> Dataset as the result. That means you cannot easily apply the model on
> streams, at least for now.
>
> There are two options to solve this. (1) Train the dataset using another
> framework to produce a simple function. (2) Adjust your model serving as a
> series of batch jobs.
>
> Hope that helps,
> Xingcan
>
> On Jul 22, 2018, at 8:56 PM, Hequn Cheng  wrote:
>
> Hi Cederic,
>
> I am not familiar with SVM or machine learning but I think we can work it
> out together.
> What problem have you met when you try to implement this function? From my
> point of view, we can rebuild the model in the flatMap function and use it
> to predict the input data. There are some flatMap documents here[1].
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations
>
>
>
>
>
> On Sun, Jul 22, 2018 at 4:12 PM, Cederic Bosmans 
> wrote:
>
>> Dear
>>
>> My name is Cederic Bosmans and I am a masters student at the Ghent
>> University (Belgium).
>> I am currently working on my masters dissertation which involves Apache
>> Flink.
>>
>> I want to make predictions in the streaming environment based on a model
>> trained in the batch environment.
>>
>> I trained my SVM-model this way:
>> val svm2 = SVM()
>> svm2.setSeed(1)
>> svm2.fit(trainLV)
>> val testVD = testLV.map(lv => (lv.vector, lv.label))
>> val evalSet = svm2.evaluate(testVD)
>>
>> and saved the model:
>> val modelSvm = svm2.weightsOption.get
>>
>> Then I have an incoming datastream in the streaming environment:
>> dataStream[(Int, Int, Int)]
>> which should be bininary classified using this trained SVM model.
>>
>> Since the predict function does only support DataSet and not DataStream,
>> on stackoverflow a flink contributor mentioned that this should be done
>> using a map/flatMap function.
>> Unfortunately I am not able to work this function out.
>>
>> It would be incredible for me if you could help me a little bit further!
>>
>> Kind regards and thanks in advance
>> Cederic Bosmans
>>
>
>
>

-- 
*David Anderson* | Training Coordinator | data Artisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time


SingleOutputStreamOperator vs DataStream?

2018-07-24 Thread chrisr123


I'm trying to get a list of late elements in my Tumbling Windows application
and I noticed
that I need to use SingleOutputStreamOperator instead of DataStream to
get
access to the .sideOutputLateData(...) method. 

Can someone explain what the difference is between
SingleOutputStreamOperator and DataStream
and why I need to use this for getting the late data?
Thanks!

Snippet:
 OutputTag lateEventsTag = new
OutputTag("late-events") {};
  SingleOutputStreamOperator windowedEvents = eventStream
.keyBy("key")

.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.sideOutputLateData(lateEventsTag)
.process(new EventBeanProcessWindowFunction());
DataStream lateEvents =
windowedEvents.getSideOutput(lateEventsTag);



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Averell
Could you please help explain more details on "/try read after write
consistency (assuming the files are not modified) /"? 
I guess that the problem I got comes from the inconsistency in S3 files
listing. Otherwise, I would have got exceptions on file not found.

My use case is to read output files from another system. That system was
built some years back, and is outputting files to their S3 bucket. There is
no file modification, only new files are being created. We want to avoid
modifying that system.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Jörn Franke
Sure kinesis is another way.

Can you try read after write consistency (assuming the files are not modified)

In any case it looks you would be better suited with a NoSQL store or kinesis 
(I don’t know your exact use case in order to provide you more details)

> On 24. Jul 2018, at 09:51, Averell  wrote:
> 
> Just some update: I tried to enable "EMRFS Consistent View" option, but it
> didn't help. Not sure whether that's what you recommended, or something
> else.
> 
> Thanks!
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Questions on Unbounded number of keys

2018-07-24 Thread Chang Liu
Dear All,

I have questions regarding the keys. In general, the questions are:
what happens if I am doing keyBy based on unlimited number of keys? How Flink 
is managing each KeyedStream under the hood? Will I get memory overflow, for 
example, if every KeyStream associated with a specific key is taking certain 
amount of memory?
BTW, I think it is fare to say that, I have to clear my KeyedState so that the 
memory used by these State are cleaned up regularly. But still, I am wondering, 
even though I am regularly cleaning up State memory, what happened to memory 
used by the KeyedStream itself, if there is? And will they be exploding?

Let me give an example for understanding it clearly.  Let’s say we have a

val requestStream: DataStream[HttpRequest]

which is a stream of HTTP requests. And by using the session ID as the key, we 
can obtain a KeyedStream per single session, as following:

val streamPerSession: KeyedStream[HttpRequest] = 
requestStream.keyBy(_.sessionId)

However, the session IDs are actually a hashcode generated randomly by the Web 
service/application, so that means, the number of sessions are unlimited (which 
is reasonable, because every time a user open the application or login, he/she 
will get a new unique session). 

Then, the question is: will Flink eventually run out of memory because the 
number of sessions are unlimited (and because we are keying by the session ID)?
If so, how can we properly manage this situation?
If not, could you help me understand WHY?
Let’s also assume that, we are regularly clearing the KeyedState, so the memory 
used by the State will not explode. 


Many Thanks and Looking forward to your reply :)

Best regards/祝好,

Chang Liu 刘畅




Re: Implement Joins with Lookup Data

2018-07-24 Thread Jain, Ankit
How often is the product db updated? Based on that you can store product 
metadata as state in Flink, maybe setup the state on cluster startup and then 
update daily etc.

Also, just based on this feature, flink doesn’t seem to add a lot of value on 
top of Kafka. As Jorn said below, you can very well store all the events in an 
external store and then periodically run a cron to enrich later since your 
processing doesn’t seem to require absolute real time.

Thanks
Ankit

From: Jörn Franke 
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal 
Cc: 
Subject: Re: Implement Joins with Lookup Data

For the first one (lookup of single entries) you could use a NoSQL db (eg key 
value store) - a relational database will not scale.

Depending on when you need to do the enrichment you could also first store the 
data and enrich it later as part of a batch process.

On 24. Jul 2018, at 05:25, Harshvardhan Agrawal 
mailto:harshvardhan.ag...@gmail.com>> wrote:
Hi,

We are using Flink for financial data enrichment and aggregations. We have 
Positions data that we are currently receiving from Kafka. We want to enrich 
that data with reference data like Product and Account information that is 
present in a relational database. From my understanding of Flink so far I think 
there are two ways to achieve this. Here are two ways to do it:

1) First Approach:
a) Get positions from Kafka and key by product key.
b) Perform lookup from the database for each key and then obtain 
Tuple2

2) Second Approach:
a) Get positions from Kafka and key by product key.
b) Window the keyed stream into say 15 seconds each.
c) For each window get the unique product keys and perform a single lookup.
d) Somehow join Positions and Products

In the first approach we will be making a lot of calls to the DB and the 
solution is very chatty. Its hard to scale this cos the database storing the 
reference data might not be very responsive.

In the second approach, I wish to join the WindowedStream with the 
SingleOutputStream and turns out I can't join a windowed stream. So I am not 
quite sure how to do that.

I wanted an opinion for what is the right thing to do. Should I go with the 
first approach or the second one. If the second one, how can I implement the 
join?

--
Regards,
Harshvardhan Agrawal


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Averell
Just some update: I tried to enable "EMRFS Consistent View" option, but it
didn't help. Not sure whether that's what you recommended, or something
else.

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question regarding State in full outer join

2018-07-24 Thread Fabian Hueske
Hi Darshan,

The join implementation in SQL / Table API does what is demanded by the SQL
semantics.
Hence, what results to emit and also what data to store (state) to compute
these results is pretty much given.
You can think of the semantics of the join as writing both streams into a
relational DBMS and executing the join on the DBMS.
The DBMS computes the join result on all data at once. In contrast, Flink
computes the results continuously whenever a new record arrives.

It might be that your join requirements do not match the semantics of a SQL
join.
In that case, you might be better off with a custom implementation based on
a ProcessFunction as Vino pointed out.

Btw. from your description, it looks like your use case could be addressed
by the time-versioned / enrichment join that is currently being developed
[1] for SQL / Table API.

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9712

2018-07-24 8:17 GMT+02:00 vino yang :

> Hi Darshan,
>
> In your use case, I think you can implement the outer join with DataStream
> API ( use State + ProcessFunction + Timer ). Using suitable statue, you can
> store 1 value per key and do not need to keep all the value's history for
> every key.
>
> And you can refer to Flink's implementation of DataStream join[1].
>
> [1]: https://github.com/apache/flink/blob/master/
> flink-libraries/flink-table/src/main/scala/org/apache/
> flink/table/plan/nodes/datastream/DataStreamJoin.scala#L223
>
> Thanks, vino.
>
> 2018-07-24 1:28 GMT+08:00 Darshan Singh :
>
>> Hi
>>
>> I was looking at the new full outer join. This seems to be working fine
>> for my use case however I have a question regarding the state size.
>>
>> I have 2 streams each will have 100's of million unique keys. Also, Each
>> of these will get the updated value of keys 100's of times per day.
>>
>> As per my understanding in full outer join flink will keep all the values
>> of the keys which it has seen in the state and whenever a new value comes
>> from
>> 1 of the stream. It will be joined against all of the key values which
>> were there for 2nd stream.It could be 1 or 100's of rows. This seems
>> inefficient
>> but my question is more on the state side. Thus, I will need to keep
>> billion's of values in state on both side. This will be very expensive.
>>
>> It is a non windowed join. A key can recieve updates for 50-60 days and
>> after that it wont get any updates on any of the streams.
>>
>> Is there a way we could use a state such that only 1 value per key is
>> retained in the state to reduce the size of the state?
>>
>> I am using the Table API but could use the Datastream api if needed.
>>
>> Thanks
>>
>
>


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Averell
Hi Jörn,
Thanks. I had missed that EMRFS strong consistency configuration. Will try
that now.
We also had a backup solution - using Kinesis instead of S3 (I don't see
Kinesis in your suggestion, but hope that it would be alright).

"/The small size and high rate is not suitable for S3 or HDFS/" <<<
regarding this, is there any guidelines on how big the file size should be
before we should consider S3/HDFS?

Thanks a lot. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 file source - continuous monitoring - many files missed

2018-07-24 Thread Jörn Franke
It could be related to S3 that seems to be configured for eventual consistency. 
Maybe it helps to configure strong consistency.

However, I recommend to replace S3 with a NoSQL database (since you are amazon 
Dynamo would help + Dynamodb streams, alternatively sns or sqs). The small size 
and high rate is not suitable for S3 or HDFS.

> On 24. Jul 2018, at 07:59, Averell  wrote:
> 
> Good day everyone,
> 
> I have a Flink job that has an S3 folder as a source, and we keep putting
> thousands of small (around 1KB each) gzip files into that folder, with the
> rate of about 5000 files per minute. Here is how I created that source in
> Scala:
> 
>   / val my_input_format = new TextInputFormat(new
> org.apache.flink.core.fs.Path(my_path))
>my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
>my_input_format.setNestedFileEnumeration(true)
> 
>val my_raw_stream = streamEnv
>.readFile(my_input_format,
>my_path,
>FileProcessingMode.PROCESS_CONTINUOUSLY,
>1000)
> /
> The problem is, with the monitoring interval of 1,000ms as above, about 20%
> of the files were missed. From Apache Flink Dashboard, at the subsequent
> operators, I could only see ~80% of the total number of files recorded
> ("Records sent" column).
> 
> If I increase the monitoring interval, the number of missed files would
> reduce. At 5,000ms, it is about 10%, and at 30,000ms, only about 2% missed.
> 
> No WARNING/ERROR recorded though.
> 
> I could not simulate this in HDFS, as I could not reach that high file
> writing speed in our cluster.
> 
> Could someone please help. Thank you very much.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question regarding State in full outer join

2018-07-24 Thread vino yang
Hi Darshan,

In your use case, I think you can implement the outer join with DataStream
API ( use State + ProcessFunction + Timer ). Using suitable statue, you can
store 1 value per key and do not need to keep all the value's history for
every key.

And you can refer to Flink's implementation of DataStream join[1].

[1]:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala#L223

Thanks, vino.

2018-07-24 1:28 GMT+08:00 Darshan Singh :

> Hi
>
> I was looking at the new full outer join. This seems to be working fine
> for my use case however I have a question regarding the state size.
>
> I have 2 streams each will have 100's of million unique keys. Also, Each
> of these will get the updated value of keys 100's of times per day.
>
> As per my understanding in full outer join flink will keep all the values
> of the keys which it has seen in the state and whenever a new value comes
> from
> 1 of the stream. It will be joined against all of the key values which
> were there for 2nd stream.It could be 1 or 100's of rows. This seems
> inefficient
> but my question is more on the state side. Thus, I will need to keep
> billion's of values in state on both side. This will be very expensive.
>
> It is a non windowed join. A key can recieve updates for 50-60 days and
> after that it wont get any updates on any of the streams.
>
> Is there a way we could use a state such that only 1 value per key is
> retained in the state to reduce the size of the state?
>
> I am using the Table API but could use the Datastream api if needed.
>
> Thanks
>