Re: S3 file source - continuous monitoring - many files missed
Hello Fabian, I created the JIRA bug https://issues.apache.org/jira/browse/FLINK-9940 BTW, I have one more question: Is it worth to checkpoint that list of processed files? Does the current implementation of file-source guarantee exactly-once? Thanks for your support. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: S3 file source - continuous monitoring - many files missed
Thank you Fabian. I tried to implement a quick test basing on what you suggested: having an offset from system time, and I did get improvement: with offset = 500ms - the problem has completely gone. With offset = 50ms, I still got around 3-5 files missed out of 10,000. This number might come from the difference between clocks of the EC2 instance and S3. I Will now try to implement exactly what you suggested, and open a Jira issue as well. Thanks for your help. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Best way to find the current alive jobmanager with HA mode zookeeper
Hi all, I have a standalone cluster with 3 jobmanagers, and set high-availability to zookeeper. Our client submits job by REST API(POST /jars/:jarid/run), which means we need to know the host of the any of the current alive jobmanagers. The problem is that, how can we know which job manager is alive, or the host of current leader? We don't want to access a dead JM. Thanks. Youjun Yuan
Re: downgrade Flink
Hi Cederic, I just read the project you gave, it includes the following statement in its README file. *“flink-jpmml is tested with the latest Flink (i.e. 1.3.2), but any working Apache Flink version (repo) should work properly.”* This project was born a year ago and should not rely on versions prior to Flink 1.0. You can confirm it again. Thanks, vino. 2018-07-25 6:44 GMT+08:00 Cederic Bosmans : > Dear > > I am working on a streaming prediction model for which I want to try to > use the flink-jpmml extension. (https://github.com/FlinkML/flink-jpmml) > Unfortunately, it only supports only the 0.7.0-SNAPSHOT and 0.6.1 > versions of Flink and I am using the 1.7-SNAPSHOT version of Flink. > How can I downgrade my version? > (the examples are written for sbt and I am using Maven) > Thank you very much! > > Kind regards > Cederic > >
Re: Flink 1.5 batch job fails to start
Hi Alex, Is it possible that the data has been corrupted? Or have you confirmed that the avro version is consistent in different Flink versions? Also, if you don't upgrade Flink and still use version 1.3.1, can it be recovered? Thanks, vino. 2018-07-25 8:32 GMT+08:00 Alex Vinnik : > Vino, > > Upgraded flink to Hadoop 2.8.1 > > $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep > entrypoint | grep 'Hadoop version' > 2018-07-25T00:19:46.142+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Hadoop version: 2.8.1 > > but job still fails to start > > Ideas? > > Caused by: org.apache.flink.util.FlinkException: Failed to submit job > d84cccd3bffcba1f243352a5e5ef99a9. > at org.apache.flink.runtime.dispatcher.Dispatcher. > submitJob(Dispatcher.java:254) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation( > AkkaRpcActor.java:247) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor. > handleRpcMessage(AkkaRpcActor.java:162) > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage( > FencedAkkaRpcActor.java:70) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive( > AkkaRpcActor.java:142) > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor. > onReceive(FencedAkkaRpcActor.java:40) > at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse( > UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > ... 4 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could > not set up JobManager > at org.apache.flink.runtime.jobmaster.JobManagerRunner.< > init>(JobManagerRunner.java:169) > at org.apache.flink.runtime.dispatcher.Dispatcher$ > DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885) > at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner( > Dispatcher.java:287) > at org.apache.flink.runtime.dispatcher.Dispatcher.runJob( > Dispatcher.java:277) > at org.apache.flink.runtime.dispatcher.Dispatcher. > persistAndRunJob(Dispatcher.java:262) > at org.apache.flink.runtime.dispatcher.Dispatcher. > submitJob(Dispatcher.java:249) > ... 21 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot > initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce. > HadoopOutputFormat@a3123a9)': Deserializing the OutputFormat > (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) > failed: unread block data > at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder. > buildGraph(ExecutionGraphBuilder.java:220) > at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder. > buildGraph(ExecutionGraphBuilder.java:100) > at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph( > JobMaster.java:1150) > at org.apache.flink.runtime.jobmaster.JobMaster. > createAndRestoreExecutionGraph(JobMaster.java:1130) > at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:298) > at org.apache.flink.runtime.jobmaster.JobManagerRunner.< > init>(JobManagerRunner.java:151) > ... 26 more > Caused by: java.lang.Exception: Deserializing the OutputFormat > (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) > failed: unread block data > at org.apache.flink.runtime.jobgraph.OutputFormatVertex. > initializeOnMaster(OutputFormatVertex.java:63) > at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder. > buildGraph(ExecutionGraphBuilder.java:216) > ... 31 more > Caused by: java.lang.IllegalStateException: unread block data > at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode( > ObjectInputStream.java:2781) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603) > at java.io.ObjectInputStream.defaultReadFields( > ObjectInputStream.java:2285) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:2067) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:488) > at org.apache.flink.util.InstantiationUtil.deserializeObject( > InstantiationUtil.java:475) > at org.apache.flink.util.InstantiationUtil.deserializeObject( >
Re: Flink 1.5 batch job fails to start
Vino, Upgraded flink to Hadoop 2.8.1 $ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep entrypoint | grep 'Hadoop version' 2018-07-25T00:19:46.142+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop version: 2.8.1 but job still fails to start Ideas? Caused by: org.apache.flink.util.FlinkException: Failed to submit job d84cccd3bffcba1f243352a5e5ef99a9. at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ... 4 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:169) at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885) at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287) at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277) at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262) at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249) ... 21 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) failed: unread block data at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150) at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130) at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:298) at org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:151) ... 26 more Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) failed: unread block data at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216) ... 31 more Caused by: java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424) at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60) ... 32 more On Tue, Jul 24, 2018 at 10:32 AM vino yang wrote: > Hi Alex, > > Based on your log information, the potential reason is Hadoop version. To > troubleshoot the exception comes from different Hadoop version. I suggest >
downgrade Flink
Dear I am working on a streaming prediction model for which I want to try to use the flink-jpmml extension. (https://github.com/FlinkML/flink-jpmml) Unfortunately, it only supports only the 0.7.0-SNAPSHOT and 0.6.1 versions of Flink and I am using the 1.7-SNAPSHOT version of Flink. How can I downgrade my version? (the examples are written for sbt and I am using Maven) Thank you very much! Kind regards Cederic
Re: Recommended fat jar excludes?
The previous list exclude a number of dependencies to prevent clashes with Flink (for example netty) which is no longer required. If you could provide the output of "mvn dependency:tree" we might be able to figure out why the jar is larger. On 24.07.2018 20:49, jlist9 wrote: We started out with a sample project from an earlier version of flink-java. The sample project's pom.xml contained a long list of elements for building the fat jar. The fat jar size is slightly over 100MB in our case. We are looking to upgrade to Flink 1.5 so we updated the pom.xml using one generated with the maven command on Flink 1.5 quick start page: https://ci.apache.org/projects/flink/flink-docs-release-1.5/quickstart/java_api_quickstart.html The fat jar from the same code increases to over 200MB. I notice that the new pom.xml file only has a very short list (4 lines). I wonder if there is a recommended exclude list for Flink 1.5? I could use the exclude list from the earlier version and do a bit of trial and error to clean it up. If there is an updated list for 1.5, it'll probably save us a lot time. File size isn't so much of an issue for storage. In our process we copy the pipeline jar files so it'll take twice of the time before we can start the job. Thanks! Jack
Recommended fat jar excludes?
We started out with a sample project from an earlier version of flink-java. The sample project's pom.xml contained a long list of elements for building the fat jar. The fat jar size is slightly over 100MB in our case. We are looking to upgrade to Flink 1.5 so we updated the pom.xml using one generated with the maven command on Flink 1.5 quick start page: https://ci.apache.org/projects/flink/flink-docs-release-1.5/quickstart/java_api_quickstart.html The fat jar from the same code increases to over 200MB. I notice that the new pom.xml file only has a very short list (4 lines). I wonder if there is a recommended exclude list for Flink 1.5? I could use the exclude list from the earlier version and do a bit of trial and error to clean it up. If there is an updated list for 1.5, it'll probably save us a lot time. File size isn't so much of an issue for storage. In our process we copy the pipeline jar files so it'll take twice of the time before we can start the job. Thanks! Jack
Avro writer has already been opened
Hi, there, I am using avro format and write data to S3, recently upgraded from Flink 1.3.2 to 1.5 and noticed the following errors as below: I am using RocksDB and checkpointDataUri is an S3 location. My writer looks like something below. val writer = new AvroKeyValueSinkWriter[String, R](properties).duplicate() sink.setWriter(writer.duplicate()) 2018-07-24 17:50:44,012 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed (4/4) (28f918a31d273e176409de3d4cb46c3c) switched from RUNNING to FAILED. java.lang.IllegalStateException: Writer has already been opened at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:69) at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:151) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:561) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) 2018-07-24 17:50:44,015 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 28 of job cc73a9db44814dc3d5a5ce497c8b0389 because: Writer has already been opened 2018-07-24 17:50:44,016 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Enrollment Log Member and Chapter (cc73a9db44814dc3d5a5ce497c8b0389) switched from state RUNNING to FAILING. java.lang.IllegalStateException: Writer has already been opened at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:69) at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.open(AvroKeyValueSinkWriter.java:151) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:561) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Any help would be greatly appreciated. Thanks! Regards, Chengzhi
Re: Implement Joins with Lookup Data
App is checkpointing, so will pick up if an operation fails. I suppose you mean a TM completely crashes and even in that case another TM would spin up and it “should” pick up from checkpoint. We are running YARN but I would assume TM recovery would be possible in any other cluster. I havent tested this specifically during init phase but we have killed TMs during normal processing as test case in stateful processing and dont remember seeing an issue. - Ashish On Tuesday, July 24, 2018, 12:31 PM, Harshvardhan Agrawal wrote: What happens when one of your workers dies? Say the machine is dead is not recoverable. How do you recover from that situation? Will the pipeline die and you go over the entire bootstrap process? On Tue, Jul 24, 2018 at 11:56 ashish pok wrote: BTW, We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states. Hope this helps! - Ashish On Tuesday, July 24, 2018, 11:37 AM, Elias Levy wrote: Alas, this suffer from the bootstrap problem. At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing. Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key. On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann wrote: Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution. On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal wrote: Hi Till, How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? Regards,Harsh On Tue, Jul 24, 2018 at 09:22 Till Rohrmann wrote: Hi Harshvardhan, I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/[2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html Cheers,Till On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal wrote: Hi, Thanks for your responses. There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change. It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. Which approach would you recommend? Regards,Harshvardhan On Tue, Jul 24, 2018 at 04:04 Jain, Ankit wrote: How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc. Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time. Thanks Ankit From: Jörn Franke Date: Monday, July 23, 2018 at 10:10 PM To: Harshvardhan Agrawal Cc: Subject: Re: Implement Joins with Lookup Data For the first one (lookup of single entries) you could use a NoSQL db (eg key value store) - a relational database will not scale. Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. On 24. Jul 2018, at 05:25, Harshvardhan Agrawal wrote: Hi, We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently
Re: LoggingFactory: Access to static method across operators
Hi Jayant, I think you should be able to implement your own StaticLoggerBinder which returns your own LoggerFactory. That is quite similar to how the different logging backends (log4j, logback) integrate with slf4j. Cheers, Till On Tue, Jul 24, 2018 at 5:41 PM Jayant Ameta wrote: > I am using a custom LoggingFactory. Is there a way to provide access to > this custom LoggingFactory to all the operators other than adding it to all > constructors? > > This is somewhat related to: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Adding-Context-To-Logs-td7351.html > > > Jayant >
Re: Implement Joins with Lookup Data
What happens when one of your workers dies? Say the machine is dead is not recoverable. How do you recover from that situation? Will the pipeline die and you go over the entire bootstrap process? On Tue, Jul 24, 2018 at 11:56 ashish pok wrote: > BTW, > > We got around bootstrap problem for similar use case using a “nohup” topic > as input stream. Our CICD pipeline currently passes an initialize option to > app IF there is a need to bootstrap and waits for X minutes before taking a > savepoint and restart app normally listening to right topic(s). I believe > there is work underway to handle this gracefully using Side Input as well. > Other than determining X minutes for initialization to complete, we havent > had any issue with this solution - we have over 40 million states refreshes > daily and close to 200Mbps input streams being joined to states. > > Hope this helps! > > > > - Ashish > > On Tuesday, July 24, 2018, 11:37 AM, Elias Levy < > fearsome.lucid...@gmail.com> wrote: > > Alas, this suffer from the bootstrap problem. At the moment Flink does > not allow you to pause a source (the positions), so you can't fully consume > the and preload the accounts or products to perform the join before the > positions start flowing. Additionally, Flink SQL does not support > materializing an upset table for the accounts or products to perform the > join, so yo have to develop your own KeyedProcessFunction, maintain the > state, and perform the join on your own if you only want to join against > the latest value for each key. > > On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann > wrote: > > Yes, using Kafka which you initialize with the initial values and then > feed changes to the Kafka topic from which you consume could be a solution. > > On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal < > harshvardhan.ag...@gmail.com> wrote: > > Hi Till, > > How would we do the initial hydration of the Product and Account data > since it’s currently in a relational DB? Do we have to copy over data to > Kafka and then use them? > > Regards, > Harsh > > On Tue, Jul 24, 2018 at 09:22 Till Rohrmann wrote: > > Hi Harshvardhan, > > I agree with Ankit that this problem could actually be solved quite > elegantly with Flink's state. If you can ingest the product/account > information changes as a stream, you can keep the latest version of it in > Flink state by using a co-map function [1, 2]. One input of the co-map > function would be the product/account update stream which updates the > respective entries in Flink's state and the other input stream is the one > to be enriched. When receiving input from this stream one would lookup the > latest information contained in the operator's state and join it with the > incoming event. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/ > [2] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html > > Cheers, > Till > > On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal < > harshvardhan.ag...@gmail.com> wrote: > > Hi, > > Thanks for your responses. > > There is no fixed interval for the data being updated. It’s more like > whenever you onboard a new product or there are any mandates that change > will trigger the reference data to change. > > It’s not just the enrichment we are doing here. Once we have enriched the > data we will be performing a bunch of aggregations using the enriched data. > > Which approach would you recommend? > > Regards, > Harshvardhan > > On Tue, Jul 24, 2018 at 04:04 Jain, Ankit wrote: > > How often is the product db updated? Based on that you can store product > metadata as state in Flink, maybe setup the state on cluster startup and > then update daily etc. > > > > Also, just based on this feature, flink doesn’t seem to add a lot of value > on top of Kafka. As Jorn said below, you can very well store all the events > in an external store and then periodically run a cron to enrich later since > your processing doesn’t seem to require absolute real time. > > > > Thanks > > Ankit > > > > *From: *Jörn Franke > *Date: *Monday, July 23, 2018 at 10:10 PM > *To: *Harshvardhan Agrawal > *Cc: * > *Subject: *Re: Implement Joins with Lookup Data > > > > For the first one (lookup of single entries) you could use a NoSQL db (eg > key value store) - a relational database will not scale. > > > > Depending on when you need to do the enrichment you could also first store > the data and enrich it later as part of a batch process. > > > On 24. Jul 2018, at 05:25, Harshvardhan Agrawal < > harshvardhan.ag...@gmail.com> wrote: > > Hi, > > > > We are using Flink for financial data enrichment and aggregations. We have > Positions data that we are currently receiving from Kafka. We want to > enrich that data with reference data like Product and Account information > that is present in a relational database. From my understanding of Flink so > far I think there are two ways
Re: Implement Joins with Lookup Data
BTW, We got around bootstrap problem for similar use case using a “nohup” topic as input stream. Our CICD pipeline currently passes an initialize option to app IF there is a need to bootstrap and waits for X minutes before taking a savepoint and restart app normally listening to right topic(s). I believe there is work underway to handle this gracefully using Side Input as well. Other than determining X minutes for initialization to complete, we havent had any issue with this solution - we have over 40 million states refreshes daily and close to 200Mbps input streams being joined to states. Hope this helps! - Ashish On Tuesday, July 24, 2018, 11:37 AM, Elias Levy wrote: Alas, this suffer from the bootstrap problem. At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing. Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key. On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann wrote: Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution. On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal wrote: Hi Till, How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? Regards,Harsh On Tue, Jul 24, 2018 at 09:22 Till Rohrmann wrote: Hi Harshvardhan, I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/[2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html Cheers,Till On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal wrote: Hi, Thanks for your responses. There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change. It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. Which approach would you recommend? Regards,Harshvardhan On Tue, Jul 24, 2018 at 04:04 Jain, Ankit wrote: How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc. Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time. Thanks Ankit From: Jörn Franke Date: Monday, July 23, 2018 at 10:10 PM To: Harshvardhan Agrawal Cc: Subject: Re: Implement Joins with Lookup Data For the first one (lookup of single entries) you could use a NoSQL db (eg key value store) - a relational database will not scale. Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. On 24. Jul 2018, at 05:25, Harshvardhan Agrawal wrote: Hi, We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it: 1) First Approach: a) Get positions from Kafka and key by product key. b) Perform lookup from the database for each key and then obtain Tuple2 2) Second Approach: a) Get positions from Kafka and key by product key. b) Window the keyed stream into say 15 seconds each. c) For each window get the unique product keys and perform a single lookup. d) Somehow join Positions and Products In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos
LoggingFactory: Access to static method across operators
I am using a custom LoggingFactory. Is there a way to provide access to this custom LoggingFactory to all the operators other than adding it to all constructors? This is somewhat related to: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Adding-Context-To-Logs-td7351.html Jayant
Re: Implement Joins with Lookup Data
Alas, this suffer from the bootstrap problem. At the moment Flink does not allow you to pause a source (the positions), so you can't fully consume the and preload the accounts or products to perform the join before the positions start flowing. Additionally, Flink SQL does not support materializing an upset table for the accounts or products to perform the join, so yo have to develop your own KeyedProcessFunction, maintain the state, and perform the join on your own if you only want to join against the latest value for each key. On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann wrote: > Yes, using Kafka which you initialize with the initial values and then > feed changes to the Kafka topic from which you consume could be a solution. > > On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal < > harshvardhan.ag...@gmail.com> wrote: > >> Hi Till, >> >> How would we do the initial hydration of the Product and Account data >> since it’s currently in a relational DB? Do we have to copy over data to >> Kafka and then use them? >> >> Regards, >> Harsh >> >> On Tue, Jul 24, 2018 at 09:22 Till Rohrmann wrote: >> >>> Hi Harshvardhan, >>> >>> I agree with Ankit that this problem could actually be solved quite >>> elegantly with Flink's state. If you can ingest the product/account >>> information changes as a stream, you can keep the latest version of it in >>> Flink state by using a co-map function [1, 2]. One input of the co-map >>> function would be the product/account update stream which updates the >>> respective entries in Flink's state and the other input stream is the one >>> to be enriched. When receiving input from this stream one would lookup the >>> latest information contained in the operator's state and join it with the >>> incoming event. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/ >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html >>> >>> Cheers, >>> Till >>> >>> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal < >>> harshvardhan.ag...@gmail.com> wrote: >>> Hi, Thanks for your responses. There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change. It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. Which approach would you recommend? Regards, Harshvardhan On Tue, Jul 24, 2018 at 04:04 Jain, Ankit wrote: > How often is the product db updated? Based on that you can store > product metadata as state in Flink, maybe setup the state on cluster > startup and then update daily etc. > > > > Also, just based on this feature, flink doesn’t seem to add a lot of > value on top of Kafka. As Jorn said below, you can very well store all the > events in an external store and then periodically run a cron to enrich > later since your processing doesn’t seem to require absolute real time. > > > > Thanks > > Ankit > > > > *From: *Jörn Franke > *Date: *Monday, July 23, 2018 at 10:10 PM > *To: *Harshvardhan Agrawal > *Cc: * > *Subject: *Re: Implement Joins with Lookup Data > > > > For the first one (lookup of single entries) you could use a NoSQL db > (eg key value store) - a relational database will not scale. > > > > Depending on when you need to do the enrichment you could also first > store the data and enrich it later as part of a batch process. > > > On 24. Jul 2018, at 05:25, Harshvardhan Agrawal < > harshvardhan.ag...@gmail.com> wrote: > > Hi, > > > > We are using Flink for financial data enrichment and aggregations. We > have Positions data that we are currently receiving from Kafka. We want to > enrich that data with reference data like Product and Account information > that is present in a relational database. From my understanding of Flink > so > far I think there are two ways to achieve this. Here are two ways to do > it: > > > > 1) First Approach: > > a) Get positions from Kafka and key by product key. > > b) Perform lookup from the database for each key and then obtain > Tuple2 > > > > 2) Second Approach: > > a) Get positions from Kafka and key by product key. > > b) Window the keyed stream into say 15 seconds each. > > c) For each window get the unique product keys and perform a single > lookup. > > d) Somehow join Positions and Products > > > > In the first approach we will be making a lot of calls to the DB and > the solution is very chatty. Its
Re: Flink 1.5 batch job fails to start
Hi Alex, Based on your log information, the potential reason is Hadoop version. To troubleshoot the exception comes from different Hadoop version. I suggest you match the both side of Hadoop version. You can : 1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's official website provides the binary binding Hadoop 2.8.[1] 2. downgrade your fat jar's Hadoop client dependency's version to match Flink Cluster's hadoop dependency's version. [1]: http://www.apache.org/dyn/closer.lua/flink/flink-1.5.1/flink-1.5.1-bin-hadoop28-scala_2.11.tgz Thanks, vino. 2018-07-24 22:59 GMT+08:00 Alex Vinnik : > Hi Till, > > Thanks for responding. Below is entrypoint logs. One thing I noticed that > "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could > it be a reason for that error? If so how can i use same hadoop version 2.8 > on flink server side? BTW job runs fine locally reading from the same s3a > buckets when executed using createLocalEnvironment via java -jar my-fat.jar > --input s3a://foo --output s3a://bar > > Regarding java version. The job is submitted via Flink UI, so it should > not be a problem. > > Thanks a lot in advance. > > 2018-07-24T12:09:38.083+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO > > 2018-07-24T12:09:38.085+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0, > Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC) > 2018-07-24T12:09:38.085+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO OS current user: flink > 2018-07-24T12:09:38.844+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Current Hadoop/Kerberos user: flink > 2018-07-24T12:09:38.844+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11 > 2018-07-24T12:09:38.844+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Maximum heap size: 1963 MiBytes > 2018-07-24T12:09:38.844+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO JAVA_HOME: /docker-java-home/jre > 2018-07-24T12:09:38.851+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Hadoop version: 2.7.3 > 2018-07-24T12:09:38.851+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO JVM Options: > 2018-07-24T12:09:38.851+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -Xms2048m > 2018-07-24T12:09:38.851+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -Xmx2048m > 2018-07-24T12:09:38.851+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk. > disableCertChecking > 2018-07-24T12:09:38.851+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -Dcom.amazonaws.sdk.disableCertChecking > 2018-07-24T12:09:38.851+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -agentlib:jdwp=transport=dt_socket,server=y,suspend=n, > address=5015 > 2018-07-24T12:09:38.851+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -Dlog4j.configuration=file:/opt/flink/conf/log4j-console. > properties > 2018-07-24T12:09:38.851+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO -Dlogback.configurationFile=file:/opt/flink/conf/logback- > console.xml > 2018-07-24T12:09:38.851+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Program Arguments: > 2018-07-24T12:09:38.852+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO --configDir > 2018-07-24T12:09:38.852+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO /opt/flink/conf > 2018-07-24T12:09:38.852+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO --executionMode > 2018-07-24T12:09:38.853+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO cluster > 2018-07-24T12:09:38.853+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO --host > 2018-07-24T12:09:38.853+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO cluster > 2018-07-24T12:09:38.853+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO Classpath: /opt/flink/lib/flink-metrics- > datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0. > jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/ > flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/ > flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1. > 7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar::: > 2018-07-24T12:09:38.853+ > [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] > INFO > > 2018-07-24T12:09:38.854+ >
Re: Flink 1.5 batch job fails to start
Hi Till, Thanks for responding. Below is entrypoint logs. One thing I noticed that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could it be a reason for that error? If so how can i use same hadoop version 2.8 on flink server side? BTW job runs fine locally reading from the same s3a buckets when executed using createLocalEnvironment via java -jar my-fat.jar --input s3a://foo --output s3a://bar Regarding java version. The job is submitted via Flink UI, so it should not be a problem. Thanks a lot in advance. 2018-07-24T12:09:38.083+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO 2018-07-24T12:09:38.085+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC) 2018-07-24T12:09:38.085+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO OS current user: flink 2018-07-24T12:09:38.844+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Current Hadoop/Kerberos user: flink 2018-07-24T12:09:38.844+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11 2018-07-24T12:09:38.844+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Maximum heap size: 1963 MiBytes 2018-07-24T12:09:38.844+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JAVA_HOME: /docker-java-home/jre 2018-07-24T12:09:38.851+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Hadoop version: 2.7.3 2018-07-24T12:09:38.851+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO JVM Options: 2018-07-24T12:09:38.851+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xms2048m 2018-07-24T12:09:38.851+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Xmx2048m 2018-07-24T12:09:38.851+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking 2018-07-24T12:09:38.851+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Dcom.amazonaws.sdk.disableCertChecking 2018-07-24T12:09:38.851+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015 2018-07-24T12:09:38.851+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 2018-07-24T12:09:38.851+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml 2018-07-24T12:09:38.851+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Program Arguments: 2018-07-24T12:09:38.852+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO --configDir 2018-07-24T12:09:38.852+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO /opt/flink/conf 2018-07-24T12:09:38.852+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO --executionMode 2018-07-24T12:09:38.853+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster 2018-07-24T12:09:38.853+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO --host 2018-07-24T12:09:38.853+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO cluster 2018-07-24T12:09:38.853+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Classpath: /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar::: 2018-07-24T12:09:38.853+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO 2018-07-24T12:09:38.854+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Registered UNIX signal handlers for [TERM, HUP, INT] 2018-07-24T12:09:38.895+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Starting StandaloneSessionClusterEntrypoint. 2018-07-24T12:09:38.895+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install default filesystem. 2018-07-24T12:09:38.927+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Install security context. 2018-07-24T12:09:39.034+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Initializing cluster services. 2018-07-24T12:09:39.059+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Trying to start actor system at flink-jobmanager:6123 2018-07-24T12:09:40.335+ [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO Actor system started at
Re: Implement Joins with Lookup Data
Yes, using Kafka which you initialize with the initial values and then feed changes to the Kafka topic from which you consume could be a solution. On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal < harshvardhan.ag...@gmail.com> wrote: > Hi Till, > > How would we do the initial hydration of the Product and Account data > since it’s currently in a relational DB? Do we have to copy over data to > Kafka and then use them? > > Regards, > Harsh > > On Tue, Jul 24, 2018 at 09:22 Till Rohrmann wrote: > >> Hi Harshvardhan, >> >> I agree with Ankit that this problem could actually be solved quite >> elegantly with Flink's state. If you can ingest the product/account >> information changes as a stream, you can keep the latest version of it in >> Flink state by using a co-map function [1, 2]. One input of the co-map >> function would be the product/account update stream which updates the >> respective entries in Flink's state and the other input stream is the one >> to be enriched. When receiving input from this stream one would lookup the >> latest information contained in the operator's state and join it with the >> incoming event. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/ >> [2] >> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html >> >> Cheers, >> Till >> >> On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal < >> harshvardhan.ag...@gmail.com> wrote: >> >>> Hi, >>> >>> Thanks for your responses. >>> >>> There is no fixed interval for the data being updated. It’s more like >>> whenever you onboard a new product or there are any mandates that change >>> will trigger the reference data to change. >>> >>> It’s not just the enrichment we are doing here. Once we have enriched >>> the data we will be performing a bunch of aggregations using the enriched >>> data. >>> >>> Which approach would you recommend? >>> >>> Regards, >>> Harshvardhan >>> >>> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit wrote: >>> How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc. Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time. Thanks Ankit *From: *Jörn Franke *Date: *Monday, July 23, 2018 at 10:10 PM *To: *Harshvardhan Agrawal *Cc: * *Subject: *Re: Implement Joins with Lookup Data For the first one (lookup of single entries) you could use a NoSQL db (eg key value store) - a relational database will not scale. Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. On 24. Jul 2018, at 05:25, Harshvardhan Agrawal < harshvardhan.ag...@gmail.com> wrote: Hi, We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it: 1) First Approach: a) Get positions from Kafka and key by product key. b) Perform lookup from the database for each key and then obtain Tuple2 2) Second Approach: a) Get positions from Kafka and key by product key. b) Window the keyed stream into say 15 seconds each. c) For each window get the unique product keys and perform a single lookup. d) Somehow join Positions and Products In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive. In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join? -- *Regards, Harshvardhan Agrawal* -- >>> Regards, >>> Harshvardhan >>> >> -- > Regards, > Harshvardhan >
Re: Implement Joins with Lookup Data
Hi Till, How would we do the initial hydration of the Product and Account data since it’s currently in a relational DB? Do we have to copy over data to Kafka and then use them? Regards, Harsh On Tue, Jul 24, 2018 at 09:22 Till Rohrmann wrote: > Hi Harshvardhan, > > I agree with Ankit that this problem could actually be solved quite > elegantly with Flink's state. If you can ingest the product/account > information changes as a stream, you can keep the latest version of it in > Flink state by using a co-map function [1, 2]. One input of the co-map > function would be the product/account update stream which updates the > respective entries in Flink's state and the other input stream is the one > to be enriched. When receiving input from this stream one would lookup the > latest information contained in the operator's state and join it with the > incoming event. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/ > [2] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html > > Cheers, > Till > > On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal < > harshvardhan.ag...@gmail.com> wrote: > >> Hi, >> >> Thanks for your responses. >> >> There is no fixed interval for the data being updated. It’s more like >> whenever you onboard a new product or there are any mandates that change >> will trigger the reference data to change. >> >> It’s not just the enrichment we are doing here. Once we have enriched the >> data we will be performing a bunch of aggregations using the enriched data. >> >> Which approach would you recommend? >> >> Regards, >> Harshvardhan >> >> On Tue, Jul 24, 2018 at 04:04 Jain, Ankit wrote: >> >>> How often is the product db updated? Based on that you can store product >>> metadata as state in Flink, maybe setup the state on cluster startup and >>> then update daily etc. >>> >>> >>> >>> Also, just based on this feature, flink doesn’t seem to add a lot of >>> value on top of Kafka. As Jorn said below, you can very well store all the >>> events in an external store and then periodically run a cron to enrich >>> later since your processing doesn’t seem to require absolute real time. >>> >>> >>> >>> Thanks >>> >>> Ankit >>> >>> >>> >>> *From: *Jörn Franke >>> *Date: *Monday, July 23, 2018 at 10:10 PM >>> *To: *Harshvardhan Agrawal >>> *Cc: * >>> *Subject: *Re: Implement Joins with Lookup Data >>> >>> >>> >>> For the first one (lookup of single entries) you could use a NoSQL db >>> (eg key value store) - a relational database will not scale. >>> >>> >>> >>> Depending on when you need to do the enrichment you could also first >>> store the data and enrich it later as part of a batch process. >>> >>> >>> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal < >>> harshvardhan.ag...@gmail.com> wrote: >>> >>> Hi, >>> >>> >>> >>> We are using Flink for financial data enrichment and aggregations. We >>> have Positions data that we are currently receiving from Kafka. We want to >>> enrich that data with reference data like Product and Account information >>> that is present in a relational database. From my understanding of Flink so >>> far I think there are two ways to achieve this. Here are two ways to do it: >>> >>> >>> >>> 1) First Approach: >>> >>> a) Get positions from Kafka and key by product key. >>> >>> b) Perform lookup from the database for each key and then obtain >>> Tuple2 >>> >>> >>> >>> 2) Second Approach: >>> >>> a) Get positions from Kafka and key by product key. >>> >>> b) Window the keyed stream into say 15 seconds each. >>> >>> c) For each window get the unique product keys and perform a single >>> lookup. >>> >>> d) Somehow join Positions and Products >>> >>> >>> >>> In the first approach we will be making a lot of calls to the DB and the >>> solution is very chatty. Its hard to scale this cos the database storing >>> the reference data might not be very responsive. >>> >>> >>> >>> In the second approach, I wish to join the WindowedStream with the >>> SingleOutputStream and turns out I can't join a windowed stream. So I am >>> not quite sure how to do that. >>> >>> >>> >>> I wanted an opinion for what is the right thing to do. Should I go with >>> the first approach or the second one. If the second one, how can I >>> implement the join? >>> >>> >>> >>> -- >>> >>> >>> *Regards, Harshvardhan Agrawal* >>> >>> -- >> Regards, >> Harshvardhan >> > -- Regards, Harshvardhan
[ANNOUNCE] Weekly community update #30
Dear community, this is the weekly community update thread #30. Please post any news and updates you want to share with the community to this thread. # First RC for Flink 1.6.0 The community is published the first release candidate for Flink 1.6.0 [1]. Please help the community by trying the RC out and reporting potential problems with it. # First RC for Flink 1.5.2 The community is published the first release candidate for Flink 1.5.2 [2]. This minor release contains several bug fixes which you can find here [3]. The community highly appreciates if you could try this RC out. # Program for Flink Forward Berlin 2018 The program for Flink Forward Berlin 2018 has been announced [4]. As every year, Flink Forward Berlin is a great opportunity for the community to meet each other in person, learn new things about Flink and its use cases and to discuss the project's future direction. [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-6-0-release-candidate-1-tp23440.html [2] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-5-2-release-candidate-1-tp23446.html [3] https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12343588 [4] https://berlin-2018.flink-forward.org/conference-program Cheers, Till
Re: Implement Joins with Lookup Data
Hi Harshvardhan, I agree with Ankit that this problem could actually be solved quite elegantly with Flink's state. If you can ingest the product/account information changes as a stream, you can keep the latest version of it in Flink state by using a co-map function [1, 2]. One input of the co-map function would be the product/account update stream which updates the respective entries in Flink's state and the other input stream is the one to be enriched. When receiving input from this stream one would lookup the latest information contained in the operator's state and join it with the incoming event. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/ [2] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html Cheers, Till On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal < harshvardhan.ag...@gmail.com> wrote: > Hi, > > Thanks for your responses. > > There is no fixed interval for the data being updated. It’s more like > whenever you onboard a new product or there are any mandates that change > will trigger the reference data to change. > > It’s not just the enrichment we are doing here. Once we have enriched the > data we will be performing a bunch of aggregations using the enriched data. > > Which approach would you recommend? > > Regards, > Harshvardhan > > On Tue, Jul 24, 2018 at 04:04 Jain, Ankit wrote: > >> How often is the product db updated? Based on that you can store product >> metadata as state in Flink, maybe setup the state on cluster startup and >> then update daily etc. >> >> >> >> Also, just based on this feature, flink doesn’t seem to add a lot of >> value on top of Kafka. As Jorn said below, you can very well store all the >> events in an external store and then periodically run a cron to enrich >> later since your processing doesn’t seem to require absolute real time. >> >> >> >> Thanks >> >> Ankit >> >> >> >> *From: *Jörn Franke >> *Date: *Monday, July 23, 2018 at 10:10 PM >> *To: *Harshvardhan Agrawal >> *Cc: * >> *Subject: *Re: Implement Joins with Lookup Data >> >> >> >> For the first one (lookup of single entries) you could use a NoSQL db (eg >> key value store) - a relational database will not scale. >> >> >> >> Depending on when you need to do the enrichment you could also first >> store the data and enrich it later as part of a batch process. >> >> >> On 24. Jul 2018, at 05:25, Harshvardhan Agrawal < >> harshvardhan.ag...@gmail.com> wrote: >> >> Hi, >> >> >> >> We are using Flink for financial data enrichment and aggregations. We >> have Positions data that we are currently receiving from Kafka. We want to >> enrich that data with reference data like Product and Account information >> that is present in a relational database. From my understanding of Flink so >> far I think there are two ways to achieve this. Here are two ways to do it: >> >> >> >> 1) First Approach: >> >> a) Get positions from Kafka and key by product key. >> >> b) Perform lookup from the database for each key and then obtain >> Tuple2 >> >> >> >> 2) Second Approach: >> >> a) Get positions from Kafka and key by product key. >> >> b) Window the keyed stream into say 15 seconds each. >> >> c) For each window get the unique product keys and perform a single >> lookup. >> >> d) Somehow join Positions and Products >> >> >> >> In the first approach we will be making a lot of calls to the DB and the >> solution is very chatty. Its hard to scale this cos the database storing >> the reference data might not be very responsive. >> >> >> >> In the second approach, I wish to join the WindowedStream with the >> SingleOutputStream and turns out I can't join a windowed stream. So I am >> not quite sure how to do that. >> >> >> >> I wanted an opinion for what is the right thing to do. Should I go with >> the first approach or the second one. If the second one, how can I >> implement the join? >> >> >> >> -- >> >> >> *Regards, Harshvardhan Agrawal* >> >> -- > Regards, > Harshvardhan >
Re: S3 file source - continuous monitoring - many files missed
Hi, The problem is that Flink tracks which files it has read by remembering the modification time of the file that was added (or modified) last. We use the modification time, to avoid that we have to remember the names of all files that were ever consumed, which would be expensive to check and store over time. One could change this logic to a hybrid approach that keeps the names of all files that have a mod timestamp that is larger than the max mod time minus an offset. It would be great if you could open a Jira issue for this problem. Thanks, Fabian 2018-07-24 14:58 GMT+02:00 Averell : > Hello Jörn. > > Thanks for your help. > "/Probably the system is putting them to the folder and Flink is triggered > before they are consistent./" <<< yes, I also guess so. However, if Flink > is > triggered before they are consistent, either (a) there should be some error > messages, or (b) Flink should be able to identify those files in the > subsequent triggers. But in my case, those files are missed forever. > > Right now those files for S3 are to be consumed by Flink only. The flow is > as follow: >Existing system >>> S3 >>> Flink >>> Elastic Search. > If I cannot find a solution to the mentioned problem, I might need to > change > to: >Existing system >>> Kinesis >>> Flink >>> Elastic Search > Or >Existing system >>> S3 >>> Kinesis >>> Flink >>> Elastic > Search > Or >Existing system >>> S3 >>> Custom File Source + Flink >>> > Elastic > Search > However, all those solutions would take much more effort. > > Thanks! > > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >
Re: S3 file source - continuous monitoring - many files missed
Hello Jörn. Thanks for your help. "/Probably the system is putting them to the folder and Flink is triggered before they are consistent./" <<< yes, I also guess so. However, if Flink is triggered before they are consistent, either (a) there should be some error messages, or (b) Flink should be able to identify those files in the subsequent triggers. But in my case, those files are missed forever. Right now those files for S3 are to be consumed by Flink only. The flow is as follow: Existing system >>> S3 >>> Flink >>> Elastic Search. If I cannot find a solution to the mentioned problem, I might need to change to: Existing system >>> Kinesis >>> Flink >>> Elastic Search Or Existing system >>> S3 >>> Kinesis >>> Flink >>> Elastic Search Or Existing system >>> S3 >>> Custom File Source + Flink >>> Elastic Search However, all those solutions would take much more effort. Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Memory Logging
Hi Oliver, which Flink image are you using? If you are using the docker image from docker hub [1], then the memory logging will go to stdout and not to a log file. The reason for this behavior is that the docker image configures the logger to print to stdout such that one can easily access the logs via `docker logs`. If this is the case, then you should find the memory logging statements somewhere in the console output. [1] https://hub.docker.com/r/_/flink/ Cheers, Till On Tue, Jul 24, 2018 at 1:43 PM Oliver Breit wrote: > Hi everyone, > > We are using a simple Flink setup with one jobmanager and one taskmanager > running inside a docker container. We are having issues enabling the > *taskmanager.debug.memory.startLogThread > *setting. We added > *taskmanager.debug.memory.startLogThread: true* > *taskmanager.debug.memory.logIntervalMs: 1000* > to our flink conf (/opt/flink/conf/flink-conf.yaml). > > The console prints > *2018-07-24 09:53:42,497 INFO > org.apache.flink.configuration.GlobalConfiguration- Loading > configuration property: taskmanager.debug.memory.startLogThread, true* > (similarly for logIntervalMs). So it seems that those values are being > read. > > In the log folder, there is only a flink--client-bc7592c47b7b.log file > with no relevant information. We don't see any task- or jobmanager logs. > We've tried adding the env.log.dir and taskmanager.log.path directly to > flink-conf.yaml. We've also added custom log4j.properties to > /opt/flink/conf/ without any apparent success. > > Is there something obvious that I am missing? > > Let me know if you need more information. > > Thanks! > > Best, > Oliver > > >
Re: SingleOutputStreamOperator vs DataStream?
Hi Chris, a `DataStream` represents a stream of events which have the same type. A `SingleOutputStreamOperator` is a subclass of `DataStream` and represents a user defined transformation applied to an input `DataStream` and producing an output `DataStream` (represented by itself). Since you can only add a side output to an operator/user defined transformation, you can only access the side output data from a `SingleOutputStreamOperator` and not from a `DataStream`. In this regard, the `SingleOutputStreamOperator` is just a richer version of the `DataStream` which requires a certain context. Cheers, Till On Tue, Jul 24, 2018 at 1:26 PM chrisr123 wrote: > > I'm trying to get a list of late elements in my Tumbling Windows > application > and I noticed > that I need to use SingleOutputStreamOperator instead of DataStream > to > get > access to the .sideOutputLateData(...) method. > > Can someone explain what the difference is between > SingleOutputStreamOperator and DataStream > and why I need to use this for getting the late data? > Thanks! > > Snippet: > OutputTag lateEventsTag = new > OutputTag("late-events") {}; > SingleOutputStreamOperator windowedEvents = eventStream > .keyBy("key") > > .window(TumblingEventTimeWindows.of(Time.seconds(3))) > .sideOutputLateData(lateEventsTag) > .process(new > EventBeanProcessWindowFunction()); > DataStream lateEvents = > windowedEvents.getSideOutput(lateEventsTag); > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: Questions on Unbounded number of keys
Hi Chang Liu, if you are dealing with an unlimited number of keys and keep state around for every key, then your state size will keep growing with the number of keys. If you are using the FileStateBackend which keeps state in memory, you will eventually run into an OutOfMemoryException. One way to solve/mitigate this problem is to use the RocksDBStateBackend which can go out of core. Alternatively, you would need to clean up your state before you run out of memory. One way to do this is to register for every key a timer which clears the state. But this only works if you don't amass too much state data before the timer is triggered. If you wish this solution is some kind of a poor man's state TTL. The Flink community is currently developing a proper implementation of it which does not rely on additional timers (which increases the state footprint) [1]. [1] https://issues.apache.org/jira/browse/FLINK-9510 Cheers, Till On Tue, Jul 24, 2018 at 10:11 AM Chang Liu wrote: > Dear All, > > I have questions regarding the keys. In general, the questions are: > >- what happens if I am doing keyBy based on unlimited number of keys? >How Flink is managing each KeyedStream under the hood? Will I get memory >overflow, for example, if every KeyStream associated with a specific key is >taking certain amount of memory? >- BTW, I think it is fare to say that, I have to clear my KeyedState >so that the memory used by these State are cleaned up regularly. But still, >I am wondering, even though I am regularly cleaning up State memory, what >happened to memory used by the KeyedStream itself, if there is? And will >they be exploding? > > > Let me give an example for understanding it clearly. Let’s say we have a > > val requestStream: DataStream[HttpRequest] > > which is a stream of HTTP requests. And by using the session ID as the > key, we can obtain a KeyedStream per single session, as following: > > val streamPerSession: KeyedStream[HttpRequest] = > requestStream.keyBy(_.sessionId) > > However, the session IDs are actually a hashcode generated randomly by the > Web service/application, so that means, the number of sessions are > unlimited (which is reasonable, because every time a user open the > application or login, he/she will get a new unique session). > > Then, the question is: will Flink eventually run out of memory because the > number of sessions are unlimited (and because we are keying by the session > ID)? > >- If so, how can we properly manage this situation? >- If not, could you help me understand WHY? >- Let’s also assume that, we are regularly clearing the KeyedState, so >the memory used by the State will not explode. > > > > Many Thanks and Looking forward to your reply :) > > Best regards/祝好, > > Chang Liu 刘畅 > > >
Re: streaming predictions
Dear Cederic, I did something similar as yours a while ago along this work [1] but I've always been working within the batch context. I'm also the co-author of flink-jpmml and, since a flink2pmml model saver library doesn't exist currently, I'd suggest you a twofold strategy to tackle this problem: - if your model is relatively simple, take the batch evaluate method (it belongs to your SVM classifier) and attempt to translate it in a flatMap function (hopefully you can reuse some internal utilities, Flink exploits breeze vector library under the hoods [3]). - if your model is a complex one, you should export the model into PMML and employ then [2]. For a first overview, this [4] is the library you should adopt as to export your model and this [5] can help you with the related implementation. Hope it can help and good luck! Andrea [1] https://dl.acm.org/citation.cfm?id=3070612 [2] https://github.com/FlinkML/flink-jpmml [3] https://github.com/apache/flink/blob/7034e9cfcb051ef90c5bf0960bfb50a79b3723f0/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L73 [4] https://github.com/jpmml/jpmml-model [5] https://github.com/jpmml/jpmml-sparkml 2018-07-24 13:29 GMT+02:00 David Anderson : > One option (which I haven't tried myself) would be to somehow get the > model into PMML format, and then use https://github.com/ > FlinkML/flink-jpmml to score the model. You could either use another > machine learning framework to train the model (i.e., a framework that > directly supports PMML export), or convert the Flink model into PMML. Since > SVMs are fairly simple to describe, that might not be terribly difficult. > > On Mon, Jul 23, 2018 at 4:18 AM Xingcan Cui wrote: > >> Hi Cederic, >> >> If the model is a simple function, you can just load it and make >> predictions using the map/flatMap function in the StreamEnvironment. >> >> But I’m afraid the model trained by Flink-ML should be a “batch job", >> whose predict method takes a Dataset as the parameter and outputs another >> Dataset as the result. That means you cannot easily apply the model on >> streams, at least for now. >> >> There are two options to solve this. (1) Train the dataset using another >> framework to produce a simple function. (2) Adjust your model serving as a >> series of batch jobs. >> >> Hope that helps, >> Xingcan >> >> On Jul 22, 2018, at 8:56 PM, Hequn Cheng wrote: >> >> Hi Cederic, >> >> I am not familiar with SVM or machine learning but I think we can work it >> out together. >> What problem have you met when you try to implement this function? From >> my point of view, we can rebuild the model in the flatMap function and use >> it to predict the input data. There are some flatMap documents here[1]. >> >> Best, Hequn >> >> [1] https://ci.apache.org/projects/flink/flink-docs- >> master/dev/stream/operators/#datastream-transformations >> >> >> >> >> >> On Sun, Jul 22, 2018 at 4:12 PM, Cederic Bosmans >> wrote: >> >>> Dear >>> >>> My name is Cederic Bosmans and I am a masters student at the Ghent >>> University (Belgium). >>> I am currently working on my masters dissertation which involves Apache >>> Flink. >>> >>> I want to make predictions in the streaming environment based on a model >>> trained in the batch environment. >>> >>> I trained my SVM-model this way: >>> val svm2 = SVM() >>> svm2.setSeed(1) >>> svm2.fit(trainLV) >>> val testVD = testLV.map(lv => (lv.vector, lv.label)) >>> val evalSet = svm2.evaluate(testVD) >>> >>> and saved the model: >>> val modelSvm = svm2.weightsOption.get >>> >>> Then I have an incoming datastream in the streaming environment: >>> dataStream[(Int, Int, Int)] >>> which should be bininary classified using this trained SVM model. >>> >>> Since the predict function does only support DataSet and not DataStream, >>> on stackoverflow a flink contributor mentioned that this should be done >>> using a map/flatMap function. >>> Unfortunately I am not able to work this function out. >>> >>> It would be incredible for me if you could help me a little bit further! >>> >>> Kind regards and thanks in advance >>> Cederic Bosmans >>> >> >> >> > > -- > *David Anderson* | Training Coordinator | data Artisans > -- > Join Flink Forward - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- *Andrea Spina* Software Engineer @ Radicalbit Srl Via Borsieri 41, 20159, Milano - IT
Re: NoClassDefFoundError when running Twitter Example
Hi Syed, could you check whether this class is actually contained in the twitter example jar? If not, then you have to build an uber jar containing all required dependencies. Cheers, Till On Tue, Jul 24, 2018 at 5:11 AM syed wrote: > I am facing the *java.lang.NoClassDefFoundError: > com/twitter/hbc/httpclient/auth/Authentication* > error when running the tweeter example. > > The example works well with the sample data, but I am unable to run it with > real tweet data. > Please guide me how to fix this issue. I am running Flink 1.3.2. > Thanks > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: Flink 1.5 batch job fails to start
Hi Alex, I'm not entirely sure what causes this problem because it is the first time I see it. First question would be if the problem also arises if using a different Hadoop version. Are you using the same Java versions on the client as well as on the server? Could you provide us with the cluster entrypoint logs? Cheers, Till On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik wrote: > Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5 > and getting a weird exception. > > Job reads json from s3a and writes parquet files to s3a with avro model. > Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to > S3AFileSystem class. > > Fails here > > https://github.com/apache/flink/blob/release-1.5.0/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java#L288 > with > Caused by: java.lang.Exception: Deserializing the OutputFormat > (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) > failed: unread block data > > To be exact it fails right on that line. > > https://github.com/apache/flink/blob/release-1.5.0/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java#L488 > > Not sure how to resolve this problem. Looking for an advice. Let me know > if more info is needed. Full stack is below. Thanks. > > org.apache.flink.runtime.rest.handler.RestHandlerException: > org.apache.flink.util.FlinkException: Failed to submit job > 13a1478cbc7ec20f93f9ee0947856bfd. > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811) > at akka.dispatch.OnComplete.internal(Future.scala:258) > at akka.dispatch.OnComplete.internal(Future.scala:256) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20) > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.util.FlinkException: Failed to submit job > 13a1478cbc7ec20f93f9ee0947856bfd. > at > java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) > at > java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) > at > java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) > at > java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) > ... 29 more > Caused by: org.apache.flink.util.FlinkException: Failed to submit job > 13a1478cbc7ec20f93f9ee0947856bfd. > at > org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at >
Re: Implement Joins with Lookup Data
Hi, Thanks for your responses. There is no fixed interval for the data being updated. It’s more like whenever you onboard a new product or there are any mandates that change will trigger the reference data to change. It’s not just the enrichment we are doing here. Once we have enriched the data we will be performing a bunch of aggregations using the enriched data. Which approach would you recommend? Regards, Harshvardhan On Tue, Jul 24, 2018 at 04:04 Jain, Ankit wrote: > How often is the product db updated? Based on that you can store product > metadata as state in Flink, maybe setup the state on cluster startup and > then update daily etc. > > > > Also, just based on this feature, flink doesn’t seem to add a lot of value > on top of Kafka. As Jorn said below, you can very well store all the events > in an external store and then periodically run a cron to enrich later since > your processing doesn’t seem to require absolute real time. > > > > Thanks > > Ankit > > > > *From: *Jörn Franke > *Date: *Monday, July 23, 2018 at 10:10 PM > *To: *Harshvardhan Agrawal > *Cc: * > *Subject: *Re: Implement Joins with Lookup Data > > > > For the first one (lookup of single entries) you could use a NoSQL db (eg > key value store) - a relational database will not scale. > > > > Depending on when you need to do the enrichment you could also first store > the data and enrich it later as part of a batch process. > > > On 24. Jul 2018, at 05:25, Harshvardhan Agrawal < > harshvardhan.ag...@gmail.com> wrote: > > Hi, > > > > We are using Flink for financial data enrichment and aggregations. We have > Positions data that we are currently receiving from Kafka. We want to > enrich that data with reference data like Product and Account information > that is present in a relational database. From my understanding of Flink so > far I think there are two ways to achieve this. Here are two ways to do it: > > > > 1) First Approach: > > a) Get positions from Kafka and key by product key. > > b) Perform lookup from the database for each key and then obtain > Tuple2 > > > > 2) Second Approach: > > a) Get positions from Kafka and key by product key. > > b) Window the keyed stream into say 15 seconds each. > > c) For each window get the unique product keys and perform a single lookup. > > d) Somehow join Positions and Products > > > > In the first approach we will be making a lot of calls to the DB and the > solution is very chatty. Its hard to scale this cos the database storing > the reference data might not be very responsive. > > > > In the second approach, I wish to join the WindowedStream with the > SingleOutputStream and turns out I can't join a windowed stream. So I am > not quite sure how to do that. > > > > I wanted an opinion for what is the right thing to do. Should I go with > the first approach or the second one. If the second one, how can I > implement the join? > > > > -- > > > *Regards, Harshvardhan Agrawal* > > -- Regards, Harshvardhan
Re: S3 file source - continuous monitoring - many files missed
https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html You will find there a passage of the consistency model. Probably the system is putting them to the folder and Flink is triggered before they are consistent. What happens after Flink put s them on S3 ? Are they reused by another system ? Or is it just archival? If they are reused then probably go for a nosql solution (eg Dynamo), if they are just archived then use kinesis + s3 > On 24. Jul 2018, at 11:52, Averell wrote: > > Could you please help explain more details on "/try read after write > consistency (assuming the files are not modified) /"? > I guess that the problem I got comes from the inconsistency in S3 files > listing. Otherwise, I would have got exceptions on file not found. > > My use case is to read output files from another system. That system was > built some years back, and is outputting files to their S3 bucket. There is > no file modification, only new files are being created. We want to avoid > modifying that system. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: When a jobmanager fails, it doesn't restart because it tries to restart non existing tasks
Hi Gerard, the first log snippet from the client does not show anything suspicious. The warning just says that you cannot use the Yarn CLI because it lacks the Hadoop dependencies in the classpath. The second snippet is indeed more interesting. If the TaskExecutors are not notified about the changed leader, then this might indicate a problem with the ZooKeeper connection or the ZooKeeper cluster itself. This might also explain why the job deletion from ZooKeeper does not succeed. One thing you could check is whether the leader ZNode under `/flink/default/leader/dispatcher_lock` (if you are using the defaults) actually contains the address of the newly elected leader. The leader path should also be logged in the cluster entrypoint logs. You can use the ZooKeeper cli for accessing the ZNodes. Cheers, Till On Mon, Jul 23, 2018 at 4:07 PM Gerard Garcia wrote: > We have just started experiencing a different problem that could be > related, maybe it helps to diagnose the issue. > > In the last 24h the jobmanager lost connection to Zookeeper a couple of > times. Each time, a new jobmanager (in a different node) was elected leader > correctly but the taskamangers kept trying to connect to the old > jobmanager. These are the ending log messages until the taskamanger shut > down itself. > > 12:06:41.747 [flink-akka.actor.default-dispatcher-5] WARN > akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote > connection to [null] failed with java.net.ConnectException: Connection > refused: (...)1/192.168.1.9:35605 > 12:06:41.748 [flink-akka.actor.default-dispatcher-2] INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not resolve > ResourceManager address akka.tcp://flink@(...)1:35605/user/resourcemanager, > retrying in 1 ms: Could not connect to rpc endpoint under address > akka.tcp://flink@(...)1:35605/user/resourcemanager.. > 12:06:41.748 [flink-akka.actor.default-dispatcher-5] WARN > akka.remote.ReliableDeliverySupervisor > flink-akka.remote.default-remote-dispatcher-15 - Association with remote > system [akka.tcp://flink@(...)1:35605] has failed, address is now gated > for [50] ms. Reason: [Association failed with [akka.tcp://flink@(...)1:35605]] > Caused by: [Connection refused: (...)1/192.168.1.9:35605] > 12:06:51.766 [flink-akka.actor.default-dispatcher-5] WARN > akka.remote.transport.netty.NettyTransport New I/O boss #3 - Remote > connection to [null] failed with java.net.ConnectException: Connection > refused: (...)1/192.168.1.9:35605 > 12:06:51.767 [flink-akka.actor.default-dispatcher-2] INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not resolve > ResourceManager address akka.tcp://flink@(...)1:35605/user/resourcemanager, > retrying in 1 ms: Could not connect to rpc endpoint under address > akka.tcp://flink@(...)1:35605/user/resourcemanager.. > 12:06:51.767 [flink-akka.actor.default-dispatcher-5] WARN > akka.remote.ReliableDeliverySupervisor > flink-akka.remote.default-remote-dispatcher-7 - Association with remote > system [akka.tcp://flink@(...)1:35605] has failed, address is now gated > for [50] ms. Reason: [Association failed with [akka.tcp://flink@(...)1:35605]] > Caused by: [Connection refused: (...)1/192.168.1.9:35605] > 12:07:01.123 [flink-akka.actor.default-dispatcher-5] ERROR > org.apache.flink.runtime.taskexecutor.TaskExecutor - Fatal error occurred > in TaskExecutor akka.tcp://flink@(...)2:33455/user/taskmanager_0. > org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: > Could not register at the ResourceManager within the specified maximum > registration duration 30 ms. This indicates a problem with this > instance. Terminating now. > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1018) > at > org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$3(TaskExecutor.java:1004) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at >
Memory Logging
Hi everyone, We are using a simple Flink setup with one jobmanager and one taskmanager running inside a docker container. We are having issues enabling the *taskmanager.debug.memory.startLogThread *setting. We added *taskmanager.debug.memory.startLogThread: true* *taskmanager.debug.memory.logIntervalMs: 1000* to our flink conf (/opt/flink/conf/flink-conf.yaml). The console prints *2018-07-24 09:53:42,497 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.debug.memory.startLogThread, true* (similarly for logIntervalMs). So it seems that those values are being read. In the log folder, there is only a flink--client-bc7592c47b7b.log file with no relevant information. We don't see any task- or jobmanager logs. We've tried adding the env.log.dir and taskmanager.log.path directly to flink-conf.yaml. We've also added custom log4j.properties to /opt/flink/conf/ without any apparent success. Is there something obvious that I am missing? Let me know if you need more information. Thanks! Best, Oliver
Re: streaming predictions
One option (which I haven't tried myself) would be to somehow get the model into PMML format, and then use https://github.com/FlinkML/flink-jpmml to score the model. You could either use another machine learning framework to train the model (i.e., a framework that directly supports PMML export), or convert the Flink model into PMML. Since SVMs are fairly simple to describe, that might not be terribly difficult. On Mon, Jul 23, 2018 at 4:18 AM Xingcan Cui wrote: > Hi Cederic, > > If the model is a simple function, you can just load it and make > predictions using the map/flatMap function in the StreamEnvironment. > > But I’m afraid the model trained by Flink-ML should be a “batch job", > whose predict method takes a Dataset as the parameter and outputs another > Dataset as the result. That means you cannot easily apply the model on > streams, at least for now. > > There are two options to solve this. (1) Train the dataset using another > framework to produce a simple function. (2) Adjust your model serving as a > series of batch jobs. > > Hope that helps, > Xingcan > > On Jul 22, 2018, at 8:56 PM, Hequn Cheng wrote: > > Hi Cederic, > > I am not familiar with SVM or machine learning but I think we can work it > out together. > What problem have you met when you try to implement this function? From my > point of view, we can rebuild the model in the flatMap function and use it > to predict the input data. There are some flatMap documents here[1]. > > Best, Hequn > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations > > > > > > On Sun, Jul 22, 2018 at 4:12 PM, Cederic Bosmans > wrote: > >> Dear >> >> My name is Cederic Bosmans and I am a masters student at the Ghent >> University (Belgium). >> I am currently working on my masters dissertation which involves Apache >> Flink. >> >> I want to make predictions in the streaming environment based on a model >> trained in the batch environment. >> >> I trained my SVM-model this way: >> val svm2 = SVM() >> svm2.setSeed(1) >> svm2.fit(trainLV) >> val testVD = testLV.map(lv => (lv.vector, lv.label)) >> val evalSet = svm2.evaluate(testVD) >> >> and saved the model: >> val modelSvm = svm2.weightsOption.get >> >> Then I have an incoming datastream in the streaming environment: >> dataStream[(Int, Int, Int)] >> which should be bininary classified using this trained SVM model. >> >> Since the predict function does only support DataSet and not DataStream, >> on stackoverflow a flink contributor mentioned that this should be done >> using a map/flatMap function. >> Unfortunately I am not able to work this function out. >> >> It would be incredible for me if you could help me a little bit further! >> >> Kind regards and thanks in advance >> Cederic Bosmans >> > > > -- *David Anderson* | Training Coordinator | data Artisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time
SingleOutputStreamOperator vs DataStream?
I'm trying to get a list of late elements in my Tumbling Windows application and I noticed that I need to use SingleOutputStreamOperator instead of DataStream to get access to the .sideOutputLateData(...) method. Can someone explain what the difference is between SingleOutputStreamOperator and DataStream and why I need to use this for getting the late data? Thanks! Snippet: OutputTag lateEventsTag = new OutputTag("late-events") {}; SingleOutputStreamOperator windowedEvents = eventStream .keyBy("key") .window(TumblingEventTimeWindows.of(Time.seconds(3))) .sideOutputLateData(lateEventsTag) .process(new EventBeanProcessWindowFunction()); DataStream lateEvents = windowedEvents.getSideOutput(lateEventsTag); -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: S3 file source - continuous monitoring - many files missed
Could you please help explain more details on "/try read after write consistency (assuming the files are not modified) /"? I guess that the problem I got comes from the inconsistency in S3 files listing. Otherwise, I would have got exceptions on file not found. My use case is to read output files from another system. That system was built some years back, and is outputting files to their S3 bucket. There is no file modification, only new files are being created. We want to avoid modifying that system. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: S3 file source - continuous monitoring - many files missed
Sure kinesis is another way. Can you try read after write consistency (assuming the files are not modified) In any case it looks you would be better suited with a NoSQL store or kinesis (I don’t know your exact use case in order to provide you more details) > On 24. Jul 2018, at 09:51, Averell wrote: > > Just some update: I tried to enable "EMRFS Consistent View" option, but it > didn't help. Not sure whether that's what you recommended, or something > else. > > Thanks! > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Questions on Unbounded number of keys
Dear All, I have questions regarding the keys. In general, the questions are: what happens if I am doing keyBy based on unlimited number of keys? How Flink is managing each KeyedStream under the hood? Will I get memory overflow, for example, if every KeyStream associated with a specific key is taking certain amount of memory? BTW, I think it is fare to say that, I have to clear my KeyedState so that the memory used by these State are cleaned up regularly. But still, I am wondering, even though I am regularly cleaning up State memory, what happened to memory used by the KeyedStream itself, if there is? And will they be exploding? Let me give an example for understanding it clearly. Let’s say we have a val requestStream: DataStream[HttpRequest] which is a stream of HTTP requests. And by using the session ID as the key, we can obtain a KeyedStream per single session, as following: val streamPerSession: KeyedStream[HttpRequest] = requestStream.keyBy(_.sessionId) However, the session IDs are actually a hashcode generated randomly by the Web service/application, so that means, the number of sessions are unlimited (which is reasonable, because every time a user open the application or login, he/she will get a new unique session). Then, the question is: will Flink eventually run out of memory because the number of sessions are unlimited (and because we are keying by the session ID)? If so, how can we properly manage this situation? If not, could you help me understand WHY? Let’s also assume that, we are regularly clearing the KeyedState, so the memory used by the State will not explode. Many Thanks and Looking forward to your reply :) Best regards/祝好, Chang Liu 刘畅
Re: Implement Joins with Lookup Data
How often is the product db updated? Based on that you can store product metadata as state in Flink, maybe setup the state on cluster startup and then update daily etc. Also, just based on this feature, flink doesn’t seem to add a lot of value on top of Kafka. As Jorn said below, you can very well store all the events in an external store and then periodically run a cron to enrich later since your processing doesn’t seem to require absolute real time. Thanks Ankit From: Jörn Franke Date: Monday, July 23, 2018 at 10:10 PM To: Harshvardhan Agrawal Cc: Subject: Re: Implement Joins with Lookup Data For the first one (lookup of single entries) you could use a NoSQL db (eg key value store) - a relational database will not scale. Depending on when you need to do the enrichment you could also first store the data and enrich it later as part of a batch process. On 24. Jul 2018, at 05:25, Harshvardhan Agrawal mailto:harshvardhan.ag...@gmail.com>> wrote: Hi, We are using Flink for financial data enrichment and aggregations. We have Positions data that we are currently receiving from Kafka. We want to enrich that data with reference data like Product and Account information that is present in a relational database. From my understanding of Flink so far I think there are two ways to achieve this. Here are two ways to do it: 1) First Approach: a) Get positions from Kafka and key by product key. b) Perform lookup from the database for each key and then obtain Tuple2 2) Second Approach: a) Get positions from Kafka and key by product key. b) Window the keyed stream into say 15 seconds each. c) For each window get the unique product keys and perform a single lookup. d) Somehow join Positions and Products In the first approach we will be making a lot of calls to the DB and the solution is very chatty. Its hard to scale this cos the database storing the reference data might not be very responsive. In the second approach, I wish to join the WindowedStream with the SingleOutputStream and turns out I can't join a windowed stream. So I am not quite sure how to do that. I wanted an opinion for what is the right thing to do. Should I go with the first approach or the second one. If the second one, how can I implement the join? -- Regards, Harshvardhan Agrawal
Re: S3 file source - continuous monitoring - many files missed
Just some update: I tried to enable "EMRFS Consistent View" option, but it didn't help. Not sure whether that's what you recommended, or something else. Thanks! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Question regarding State in full outer join
Hi Darshan, The join implementation in SQL / Table API does what is demanded by the SQL semantics. Hence, what results to emit and also what data to store (state) to compute these results is pretty much given. You can think of the semantics of the join as writing both streams into a relational DBMS and executing the join on the DBMS. The DBMS computes the join result on all data at once. In contrast, Flink computes the results continuously whenever a new record arrives. It might be that your join requirements do not match the semantics of a SQL join. In that case, you might be better off with a custom implementation based on a ProcessFunction as Vino pointed out. Btw. from your description, it looks like your use case could be addressed by the time-versioned / enrichment join that is currently being developed [1] for SQL / Table API. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-9712 2018-07-24 8:17 GMT+02:00 vino yang : > Hi Darshan, > > In your use case, I think you can implement the outer join with DataStream > API ( use State + ProcessFunction + Timer ). Using suitable statue, you can > store 1 value per key and do not need to keep all the value's history for > every key. > > And you can refer to Flink's implementation of DataStream join[1]. > > [1]: https://github.com/apache/flink/blob/master/ > flink-libraries/flink-table/src/main/scala/org/apache/ > flink/table/plan/nodes/datastream/DataStreamJoin.scala#L223 > > Thanks, vino. > > 2018-07-24 1:28 GMT+08:00 Darshan Singh : > >> Hi >> >> I was looking at the new full outer join. This seems to be working fine >> for my use case however I have a question regarding the state size. >> >> I have 2 streams each will have 100's of million unique keys. Also, Each >> of these will get the updated value of keys 100's of times per day. >> >> As per my understanding in full outer join flink will keep all the values >> of the keys which it has seen in the state and whenever a new value comes >> from >> 1 of the stream. It will be joined against all of the key values which >> were there for 2nd stream.It could be 1 or 100's of rows. This seems >> inefficient >> but my question is more on the state side. Thus, I will need to keep >> billion's of values in state on both side. This will be very expensive. >> >> It is a non windowed join. A key can recieve updates for 50-60 days and >> after that it wont get any updates on any of the streams. >> >> Is there a way we could use a state such that only 1 value per key is >> retained in the state to reduce the size of the state? >> >> I am using the Table API but could use the Datastream api if needed. >> >> Thanks >> > >
Re: S3 file source - continuous monitoring - many files missed
Hi Jörn, Thanks. I had missed that EMRFS strong consistency configuration. Will try that now. We also had a backup solution - using Kinesis instead of S3 (I don't see Kinesis in your suggestion, but hope that it would be alright). "/The small size and high rate is not suitable for S3 or HDFS/" <<< regarding this, is there any guidelines on how big the file size should be before we should consider S3/HDFS? Thanks a lot. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: S3 file source - continuous monitoring - many files missed
It could be related to S3 that seems to be configured for eventual consistency. Maybe it helps to configure strong consistency. However, I recommend to replace S3 with a NoSQL database (since you are amazon Dynamo would help + Dynamodb streams, alternatively sns or sqs). The small size and high rate is not suitable for S3 or HDFS. > On 24. Jul 2018, at 07:59, Averell wrote: > > Good day everyone, > > I have a Flink job that has an S3 folder as a source, and we keep putting > thousands of small (around 1KB each) gzip files into that folder, with the > rate of about 5000 files per minute. Here is how I created that source in > Scala: > > / val my_input_format = new TextInputFormat(new > org.apache.flink.core.fs.Path(my_path)) >my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter()) >my_input_format.setNestedFileEnumeration(true) > >val my_raw_stream = streamEnv >.readFile(my_input_format, >my_path, >FileProcessingMode.PROCESS_CONTINUOUSLY, >1000) > / > The problem is, with the monitoring interval of 1,000ms as above, about 20% > of the files were missed. From Apache Flink Dashboard, at the subsequent > operators, I could only see ~80% of the total number of files recorded > ("Records sent" column). > > If I increase the monitoring interval, the number of missed files would > reduce. At 5,000ms, it is about 10%, and at 30,000ms, only about 2% missed. > > No WARNING/ERROR recorded though. > > I could not simulate this in HDFS, as I could not reach that high file > writing speed in our cluster. > > Could someone please help. Thank you very much. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Question regarding State in full outer join
Hi Darshan, In your use case, I think you can implement the outer join with DataStream API ( use State + ProcessFunction + Timer ). Using suitable statue, you can store 1 value per key and do not need to keep all the value's history for every key. And you can refer to Flink's implementation of DataStream join[1]. [1]: https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala#L223 Thanks, vino. 2018-07-24 1:28 GMT+08:00 Darshan Singh : > Hi > > I was looking at the new full outer join. This seems to be working fine > for my use case however I have a question regarding the state size. > > I have 2 streams each will have 100's of million unique keys. Also, Each > of these will get the updated value of keys 100's of times per day. > > As per my understanding in full outer join flink will keep all the values > of the keys which it has seen in the state and whenever a new value comes > from > 1 of the stream. It will be joined against all of the key values which > were there for 2nd stream.It could be 1 or 100's of rows. This seems > inefficient > but my question is more on the state side. Thus, I will need to keep > billion's of values in state on both side. This will be very expensive. > > It is a non windowed join. A key can recieve updates for 50-60 days and > after that it wont get any updates on any of the streams. > > Is there a way we could use a state such that only 1 value per key is > retained in the state to reduce the size of the state? > > I am using the Table API but could use the Datastream api if needed. > > Thanks >