Statefun 2.0 questions
Hi all, I've been using Flink for quite some time now and for a university project I'm planning to experiment with statefun. During the walkthrough I've run into some issues, I hope you can help me with. 1) Is it correct that the Docker image of statefun is not yet published? I couldn't find it anywhere, but was able to run it by building the image myself. 2) In the example project using the Python SDK, it uses Flask to expose a function using POST. Is there also a way to serve GET request so that you can trigger a stateful function by for instance using your browser? 3) Do you expect a lot of performance loss when using the Python SDK over Java? Thanks in advance! Regards, Wouter
Re: Statefun 2.0 questions
Hi Igal, Thanks for your quick reply. Getting back to point 2, I was wondering if you could trigger indeed a stateful function directly from Flask and also get the reply there instead of using Kafka in between. We want to experiment running stateful functions behind a front-end (which should be able to trigger a function), but we're a bit afraid that using Kafka doesn't scale well if on the frontend side a user has to consume all Kafka messages to find the correct reply/output for a certain request/input. Any thoughts? Thanks in advance, Wouter Op do 7 mei 2020 om 10:51 schreef Igal Shilman : > Hi Wouter! > > Glad to read that you are using Flink for quite some time, and also > exploring with StateFun! > > 1) yes it is correct and you can follow the Dockerhub contribution PR at > [1] > > 2) I’m not sure I understand what do you mean by trigger from the browser. > If you mean, for testing / illustration purposes triggering the function > independently of StateFun, you would need to write some JavaScript and > preform the POST (assuming CORS are enabled) > Let me know if you’d like getting further information of how to do it. > Broadly speaking, GET is traditionally used to get data from a resource > and POST to send data (the data is the invocation batch in our case). > > One easier walk around for you would be to expose another endpoint in your > Flask application, and call your stateful function directly from there > (possibly populating the function argument with values taken from the query > params) > > 3) I would expect a performance loss when going from the embedded SDK to > the remote one, simply because the remote function is at a different > process, and a round trip is required. There are different ways of > deployment even for remote functions. > For example they can be co-located with the Task managers and communicate > via the loop back device /Unix domain socket, or they can be deployed > behind a load balancer with an auto-scaler, and thus reacting to higher > request rate/latency increases by spinning new instances (something that is > not yet supported with the embedded API) > > Good luck, > Igal. > > > > > > [1] https://github.com/docker-library/official-images/pull/7749 > > > On Wednesday, May 6, 2020, Wouter Zorgdrager > wrote: > >> Hi all, >> >> I've been using Flink for quite some time now and for a university >> project I'm planning to experiment with statefun. During the walkthrough >> I've run into some issues, I hope you can help me with. >> >> 1) Is it correct that the Docker image of statefun is not yet published? >> I couldn't find it anywhere, but was able to run it by building the image >> myself. >> 2) In the example project using the Python SDK, it uses Flask to expose a >> function using POST. Is there also a way to serve GET request so that you >> can trigger a stateful function by for instance using your browser? >> 3) Do you expect a lot of performance loss when using the Python SDK over >> Java? >> >> Thanks in advance! >> >> Regards, >> Wouter >> >
Re: Statefun 2.0 questions
Hi Igal, all, In the meantime we found a way to serve Flink stateful functions in a frontend. We decided to add another (set of) Flask application(s) which link to Kafka topics. These Kafka topics then serve as ingress and egress for the statefun cluster. However, we're wondering how we can scale this cluster. On the documentation page some nice figures are provided for different setups but no implementation details are given. In our case we are using a remote cluster so we have a Docker instance containing the `python-stateful-function` and of course the Flink cluster containing a `master` and `worker`. If I understood correctly, in a remote setting, we can scale both the Flink cluster and the `python-stateful-function`. Scaling the Flink cluster is trivial because I can add just more workers/task-managers (providing more taskslots) just by scaling the worker instance. However, how can I scale the stateful function also ensuring that it ends op in the correct Flink job (because we need shared state there). I tried scaling the Docker instance as well but that didn't seem to work. Hope you can give me some leads there. Thanks in advance! Kind regards, Wouter Op do 7 mei 2020 om 17:17 schreef Wouter Zorgdrager : > Hi Igal, > > Thanks for your quick reply. Getting back to point 2, I was wondering if > you could trigger indeed a stateful function directly from Flask and also > get the reply there instead of using Kafka in between. We want to > experiment running stateful functions behind a front-end (which should be > able to trigger a function), but we're a bit afraid that using Kafka > doesn't scale well if on the frontend side a user has to consume all Kafka > messages to find the correct reply/output for a certain request/input. Any > thoughts? > > Thanks in advance, > Wouter > > Op do 7 mei 2020 om 10:51 schreef Igal Shilman : > >> Hi Wouter! >> >> Glad to read that you are using Flink for quite some time, and also >> exploring with StateFun! >> >> 1) yes it is correct and you can follow the Dockerhub contribution PR at >> [1] >> >> 2) I’m not sure I understand what do you mean by trigger from the browser. >> If you mean, for testing / illustration purposes triggering the function >> independently of StateFun, you would need to write some JavaScript and >> preform the POST (assuming CORS are enabled) >> Let me know if you’d like getting further information of how to do it. >> Broadly speaking, GET is traditionally used to get data from a resource >> and POST to send data (the data is the invocation batch in our case). >> >> One easier walk around for you would be to expose another endpoint in >> your Flask application, and call your stateful function directly from there >> (possibly populating the function argument with values taken from the query >> params) >> >> 3) I would expect a performance loss when going from the embedded SDK to >> the remote one, simply because the remote function is at a different >> process, and a round trip is required. There are different ways of >> deployment even for remote functions. >> For example they can be co-located with the Task managers and communicate >> via the loop back device /Unix domain socket, or they can be deployed >> behind a load balancer with an auto-scaler, and thus reacting to higher >> request rate/latency increases by spinning new instances (something that is >> not yet supported with the embedded API) >> >> Good luck, >> Igal. >> >> >> >> >> >> [1] https://github.com/docker-library/official-images/pull/7749 >> >> >> On Wednesday, May 6, 2020, Wouter Zorgdrager >> wrote: >> >>> Hi all, >>> >>> I've been using Flink for quite some time now and for a university >>> project I'm planning to experiment with statefun. During the walkthrough >>> I've run into some issues, I hope you can help me with. >>> >>> 1) Is it correct that the Docker image of statefun is not yet published? >>> I couldn't find it anywhere, but was able to run it by building the image >>> myself. >>> 2) In the example project using the Python SDK, it uses Flask to expose >>> a function using POST. Is there also a way to serve GET request so that you >>> can trigger a stateful function by for instance using your browser? >>> 3) Do you expect a lot of performance loss when using the Python SDK >>> over Java? >>> >>> Thanks in advance! >>> >>> Regards, >>> Wouter >>> >>
Re: Statefun 2.0 questions
Dear Igal, all, Thanks a lot. This is very helpful. I understand the architecture a bit more now. We can just scale the stateful functions and put a load balancer in front and Flink will contact them. The only part of the scaling I don't understand yet is how to scale the 'Flink side'. So If I understand correctly the Kafka ingress/egress parts runs on the Flink cluster and contacts the remote workers through HTTP. How can I scale this Kafka part then? For a normal Flink job I would just change the parallelism, but I couldn't really find that option yet. Is there some value I need to set in the module.yaml. Once again, thanks for the help so far. It has been useful. Regards, Wouter Op wo 13 mei 2020 om 00:03 schreef Igal Shilman : > Hi Wouter, > > Triggering a stateful function from a frontend indeed requires an ingress > between them, so the way you've approached this is also the way we were > thinking of. > As Gordon mentioned a potential improvement might be an HTTP ingress, that > would allow triggering stateful functions directly from the front end > servers. > But this kind of ingress is not implemented yet. > > Regarding scaling: Your understanding is correct, you can scale both the > Flink cluster and the remote "python-stateful-function" cluster > independently. > Scaling the Flink cluster, tho, requires taking a savepoint, bumping the > job parallelism, and starting the cluster with more workers from the > savepoint taken previously. > > Scaling "python-stateful-function" workers can be done transparently to > the Flink cluster, but the exact details are deployment specific. > - For example the python workers are a k8s service. > - Or the python workers are deployed behind a load balancer > - Or you add new entries to the DNS record of your python worker. > > I didn't understand "ensuring that it ends op in the correct Flink job" > can you please clarify? > Flink would be the one contacting the remote workers and not the other way > around. So as long as the new instances > are visible to Flink they would be reached with the same shared state. > > I'd recommend watching [1] and the demo at the end, and [2] for a demo > using stateful functions on AWS lambda. > > [1] https://youtu.be/NF0hXZfUyqE > [2] https://www.youtube.com/watch?v=tuSylBadNSo > > It seems like you are on the correct path! > Good luck! > Igal. > > > On Tue, May 12, 2020 at 11:18 PM Wouter Zorgdrager > wrote: > >> Hi Igal, all, >> >> In the meantime we found a way to serve Flink stateful functions in a >> frontend. We decided to add another (set of) Flask application(s) which >> link to Kafka topics. These Kafka topics then serve as ingress and egress >> for the statefun cluster. However, we're wondering how we can scale this >> cluster. On the documentation page some nice figures are provided for >> different setups but no implementation details are given. In our case we >> are using a remote cluster so we have a Docker instance containing the >> `python-stateful-function` and of course the Flink cluster containing a >> `master` and `worker`. If I understood correctly, in a remote setting, we >> can scale both the Flink cluster and the `python-stateful-function`. >> Scaling the Flink cluster is trivial because I can add just more >> workers/task-managers (providing more taskslots) just by scaling the worker >> instance. However, how can I scale the stateful function also ensuring that >> it ends op in the correct Flink job (because we need shared state there). I >> tried scaling the Docker instance as well but that didn't seem to work. >> >> Hope you can give me some leads there. >> Thanks in advance! >> >> Kind regards, >> Wouter >> >> Op do 7 mei 2020 om 17:17 schreef Wouter Zorgdrager < >> zorgdrag...@gmail.com>: >> >>> Hi Igal, >>> >>> Thanks for your quick reply. Getting back to point 2, I was wondering if >>> you could trigger indeed a stateful function directly from Flask and also >>> get the reply there instead of using Kafka in between. We want to >>> experiment running stateful functions behind a front-end (which should be >>> able to trigger a function), but we're a bit afraid that using Kafka >>> doesn't scale well if on the frontend side a user has to consume all Kafka >>> messages to find the correct reply/output for a certain request/input. Any >>> thoughts? >>> >>> Thanks in advance, >>> Wouter >>> >>> Op do 7 mei 2020 om 10:51 schreef Igal Shilman : >>> >>>> Hi Wouter! &g
Akka version conflict running on Flink cluster
Hi, I think I'm running into an Akka version conflict when running a Flink job on a cluster. The current situation: - Flink cluster on Flink 1.4.2 (using Docker) - Flink job which uses twitter4s [1] library and Akka version 2.5.8 In my Flink job I try to 'shutdown' an Akka actor from the twitter4s library. This results in a whole taskmanager crashing with the following stacktrace: taskrunner_1 | 2018-06-11 09:03:14,454 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Sink: Unnamed (0ba7f7f259eee06fe2f7d783c868179b) taskrunner_1 | Uncaught error from thread [twitter4s-streaming-akka.actor.default-dispatcher-288]: loader constraint violation: when resolving method "akka.actor.ActorCell$$anonfun$3.(Lakka/actor/ActorCell;)V" the class loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) of the current class, akka/actor/ActorCell, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, akka/actor/ActorCell$$anonfun$3, have different Class objects for the type akka/actor/ActorCell used in the signature, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for for ActorSystem[twitter4s-streaming] taskrunner_1 | java.lang.LinkageError: loader constraint violation: when resolving method "akka.actor.ActorCell$$anonfun$3.(Lakka/actor/ActorCell;)V" the class loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) of the current class, akka/actor/ActorCell, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, akka/actor/ActorCell$$anonfun$3, have different Class objects for the type akka/actor/ActorCell used in the signature taskrunner_1 | at akka.actor.ActorCell.invoke(ActorCell.scala:499) taskrunner_1 | at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) taskrunner_1 | at akka.dispatch.Mailbox.run(Mailbox.scala:224) taskrunner_1 | at akka.dispatch.Mailbox.exec(Mailbox.scala:234) taskrunner_1 | at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) taskrunner_1 | at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) taskrunner_1 | at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) taskrunner_1 | at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) taskrunner_1 | 2018-06-11 09:03:14,984 INFO org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache taskrunner_1 | 2018-06-11 09:03:14,985 INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache taskrunner_1 | Exception in thread "twitter4s-streaming-shutdown-hook-1" java.lang.NoClassDefFoundError: akka/actor/CoordinatedShutdown$$anonfun$totalTimeout$1 taskrunner_1 | at akka.actor.CoordinatedShutdown.totalTimeout(CoordinatedShutdown.scala:515) taskrunner_1 | at akka.actor.CoordinatedShutdown$$anonfun$initJvmHook$1.apply(CoordinatedShutdown.scala:217) taskrunner_1 | at akka.actor.CoordinatedShutdown$$anon$2.run(CoordinatedShutdown.scala:547) taskrunner_1 | Caused by: java.lang.ClassNotFoundException: akka.actor.CoordinatedShutdown$$anonfun$totalTimeout$1 taskrunner_1 | at java.net.URLClassLoader.findClass(URLClassLoader.java:381) taskrunner_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:424) taskrunner_1 | at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) taskrunner_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:357) taskrunner_1 | ... 3 more To me, it looks like an version conflict. Any suggestions how to solve this? Thanks! Wouter [1] - Twitter4s: https://github.com/DanielaSfregola/twitter4s/blob/master/build.sbt
Avro serialization and deserialization to Kafka in Scala
Hello all, I saw the recent updates in Flink related to supporting Avro schema evolution in state. I'm curious how Flink handles this internally for Scala case classes. I'm working on custom (de-)serialization schema's to write and read from Kafka. However, I'm currently stuck because of the fact that Avro doesn't natively support Scala. This means that in order to support case class serialization using Scala specific types (like Option, Either, etc.) I need a library like Avro4s [1] or AvroHugger [2] which on compile-time generates schemas using macros. These macro-extensions are extremely slow for complex case classes (compile-time of 15 minutes for a few nested types). I'm looking for an approach without the use of these libraries and therefore curious how Flink handles this. Does anyone has some good leads for this? Thanks in advance! Kind regards, Wouter Zorgdrager [1] https://github.com/sksamuel/avro4s [2] https://github.com/julianpeeters/avrohugger
Using Flink in an university course
Hi all, I'm working on a setup to use Apache Flink in an assignment for a Big Data (bachelor) university course and I'm interested in your view on this. To sketch the situation: - > 200 students follow this course - students have to write some (simple) Flink applications using the DataStream API; the focus is on writing the transformation code - students need to write Scala code - we provide a dataset and a template (Scala class) with function signatures and detailed description per application. e.g.: def assignment_one(input: DataStream[Event]): DataStream[(String, Int)] = ??? - we provide some setup code like parsing of data and setting up the streaming environment - assignments need to be auto-graded, based on correct results In last years course edition we approached this by a custom Docker container. This container first compiled the students code, run all the Flink applications against a different dataset and then verified the output against our solutions. This was turned into a grade and reported back to the student. Although this was a working approach, I think we can do better. I'm wondering if any of you have experience with using Apache Flink in a university course (or have seen this somewhere) as well as assessing Flink code. Thanks a lot! Kind regards, Wouter Zorgdrager
Re: Using Flink in an university course
Hey all, Thanks for the replies. The issues we were running into (which are not specific to Docker): - Students changing the template wrongly failed the container. - We give full points if the output matches our solutions (and none otherwise), but it would be nice if we could give partial grades per assignment (and better feedback). This would require instead of looking only at results also at the operators used. The pitfall is that in many cases a correct solution can be achieved in multiple ways. I came across a Flink test library [1] which allows to test Flink code more extensively but seems to be only in Java. In retrospective, I do think using Docker is a good approach as Fabian confirms. However, the way we currently assess student solutions might be improved. I assume that in your trainings manual feedback is given, but unfortunately this is quite difficult for so many students. Cheers, Wouter 1: https://github.com/ottogroup/flink-spector Op ma 4 mrt. 2019 om 14:39 schreef Fabian Hueske : > Hi Wouter, > > We are using Docker Compose (Flink JM, Flink TM, Kafka, Zookeeper) setups > for our trainings and it is working very well. > We have an additional container that feeds a Kafka topic via the > commandline producer to simulate a somewhat realistic behavior. > Of course, you can do it without Kafka as and use some kind of data > generating source that reads from a file that is replace for evaluation. > > The biggest benefit that I see with using Docker is that the students have > an environment that is close to grading situation for development and > testing. > You do not need to provide infrastructure but everyone is running it > locally in a well-defined context. > > So, as Joern said, what problems do you see with Docker? > > Best, > Fabian > > Am Mo., 4. März 2019 um 13:44 Uhr schrieb Jörn Franke < > jornfra...@gmail.com>: > >> It would help to understand the current issues that you have with this >> approach? I used a similar approach (not with Flink, but a similar big data >> technology) some years ago >> >> > Am 04.03.2019 um 11:32 schrieb Wouter Zorgdrager < >> w.d.zorgdra...@tudelft.nl>: >> > >> > Hi all, >> > >> > I'm working on a setup to use Apache Flink in an assignment for a Big >> Data (bachelor) university course and I'm interested in your view on this. >> To sketch the situation: >> > - > 200 students follow this course >> > - students have to write some (simple) Flink applications using the >> DataStream API; the focus is on writing the transformation code >> > - students need to write Scala code >> > - we provide a dataset and a template (Scala class) with function >> signatures and detailed description per application. >> > e.g.: def assignment_one(input: DataStream[Event]): DataStream[(String, >> Int)] = ??? >> > - we provide some setup code like parsing of data and setting up the >> streaming environment >> > - assignments need to be auto-graded, based on correct results >> > >> > In last years course edition we approached this by a custom Docker >> container. This container first compiled the students code, run all the >> Flink applications against a different dataset and then verified the output >> against our solutions. This was turned into a grade and reported back to >> the student. Although this was a working approach, I think we can do better. >> > >> > I'm wondering if any of you have experience with using Apache Flink in >> a university course (or have seen this somewhere) as well as assessing >> Flink code. >> > >> > Thanks a lot! >> > >> > Kind regards, >> > Wouter Zorgdrager >> >
Re: Using Flink in an university course
Hi all, Thanks for the input. Much appreciated. Regards, Wouter Op ma 4 mrt. 2019 om 20:40 schreef Addison Higham : > Hi there, > > As far as a runtime for students, it seems like docker is your best bet. > However, you could have them instead package a jar using some interface > (for example, see > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/packaging.html, > which details the `Program` interface) and then execute it inside a custom > runner. That *might* result in something less prone to breakage as it would > need to conform to an interface, but it may require a fair amount of custom > code to reduce the boiler plate to build up a program plan as well as the > custom runner. The code for how flink loads a jar and turns it into > something it can execute is mostly encapsulated > in org.apache.flink.client.program.PackagedProgram, which might be a good > thing to read and understand if you go down this route. > > If you want to give more insight, you could build some tooling to traverse > the underlying graphs that the students build up in their data stream > application. For example, calling > `StreamExecutionEnvironment.getStreamGraph` after the data stream is built > will get a graph of the current job, which you can then use to traverse a > graph and see which operators and edges are in use. This is very similar to > the process flink uses to build the job DAG it renders in the UI. I am not > sure what you could do as an automated analysis, but the StreamGraph API is > quite low level and exposes a lot of information about the program. > > Hopefully that is a little bit helpful. Good luck and sounds like a fun > course! > > > On Mon, Mar 4, 2019 at 7:16 AM Wouter Zorgdrager < > w.d.zorgdra...@tudelft.nl> wrote: > >> Hey all, >> >> Thanks for the replies. The issues we were running into (which are not >> specific to Docker): >> - Students changing the template wrongly failed the container. >> - We give full points if the output matches our solutions (and none >> otherwise), but it would be nice if we could give partial grades per >> assignment (and better feedback). This would require instead of looking >> only at results also at the operators used. The pitfall is that in many >> cases a correct solution can be achieved in multiple ways. I came across a >> Flink test library [1] which allows to test Flink code more extensively but >> seems to be only in Java. >> >> In retrospective, I do think using Docker is a good approach as Fabian >> confirms. However, the way we currently assess student solutions might be >> improved. I assume that in your trainings manual feedback is given, but >> unfortunately this is quite difficult for so many students. >> >> Cheers, >> Wouter >> >> 1: https://github.com/ottogroup/flink-spector >> >> >> Op ma 4 mrt. 2019 om 14:39 schreef Fabian Hueske : >> >>> Hi Wouter, >>> >>> We are using Docker Compose (Flink JM, Flink TM, Kafka, Zookeeper) >>> setups for our trainings and it is working very well. >>> We have an additional container that feeds a Kafka topic via the >>> commandline producer to simulate a somewhat realistic behavior. >>> Of course, you can do it without Kafka as and use some kind of data >>> generating source that reads from a file that is replace for evaluation. >>> >>> The biggest benefit that I see with using Docker is that the students >>> have an environment that is close to grading situation for development and >>> testing. >>> You do not need to provide infrastructure but everyone is running it >>> locally in a well-defined context. >>> >>> So, as Joern said, what problems do you see with Docker? >>> >>> Best, >>> Fabian >>> >>> Am Mo., 4. März 2019 um 13:44 Uhr schrieb Jörn Franke < >>> jornfra...@gmail.com>: >>> >>>> It would help to understand the current issues that you have with this >>>> approach? I used a similar approach (not with Flink, but a similar big data >>>> technology) some years ago >>>> >>>> > Am 04.03.2019 um 11:32 schrieb Wouter Zorgdrager < >>>> w.d.zorgdra...@tudelft.nl>: >>>> > >>>> > Hi all, >>>> > >>>> > I'm working on a setup to use Apache Flink in an assignment for a Big >>>> Data (bachelor) university course and I'm interested in your view on this. >>>> To sketch the situation: >>>> > - > 200 students follow this course >>>> > -
Challenges using Flink REST API
an option, because the (main) idea behind this framework is to reduce the boilerplate and cumbersome of setting up complex stream processing architectures. Any help is appreciated. Thanks in advance! Kind regards, Wouter Zorgdrager
Re: Challenges using Flink REST API
Hi Chesnay, Unfortunately this is not true when I run the Flink 1.7.2 docker images. The response is still: { "errors": [ "org.apache.flink.client.program.ProgramInvocationException: The main method caused an error." ] } Regards, Wouter Zorgdrager Op wo 13 mrt. 2019 om 10:42 schreef Chesnay Schepler : > You should get the full stacktrace if you upgrade to 1.7.2 . > > > On 13.03.2019 09:55, Wouter Zorgdrager wrote: > > Hey all! > > I'm looking for some advice on the following; I'm working on an > abstraction on top of Apache Flink to 'pipeline' Flink applications using > Kafka. For deployment this means that all these Flink jobs are embedded > into one jar and each job is started using an program argument (e.g. > "--stage 'FirstFlinkJob'". To ease deploying a set of interconnected Flink > jobs onto a cluster I wrote a Python script which basically communicates > with the REST client of the JobManager. So you can do things like "pipeline > start --jar 'JarWithThePipeline.jar'" and this would deploy every Flink > application separately. > > However, this script was written a while ago against Flink version > "1.4.2". This week I tried to upgrade it to Flink latest version but I > noticed a change in the REST responses. In order to get the "pipeline > start" command working,we need to know all the Flink jobs that are in the > jar (we call these Flink jobs 'stages') because we need to know the stage > names as argument for the jar. For the 1.4.2 version we used a dirty trick; > we ran the jar with '--list --asException' as program arguments which > basically runs the jar file and immediately throws an exception with the > stage names. These are then parsed and used to start every stage > separately. The error message that Flink threw looked something like this: > > java.util.concurrent.CompletionException: > org.apache.flink.util.FlinkException: Could not run the jar. > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not run the jar. > ... 9 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > main method caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) > at > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) > at > org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334) > at > org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87) > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69) > ... 8 more > Caused by: org.codefeedr.pipeline.PipelineListException: > ["org.codefeedr.plugin.twitter.stages.TwitterStatusInput","mongo_tweets","elasticsearch_tweets"] > at org.codefeedr.pipeline.Pipeline.showList(Pipeline.scala:114) > at org.codefeedr.pipeline.Pipeline.start(Pipeline.scala:100) > at nl.wouterr.Main$.main(Main.scala:23) > at nl.wouterr.Main.main(Main.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525) > > However, for 1.7.0 this trick doesn't work anymore because instead of > returning the full stack trace, it only returns the following: > org.apache.flink.client.program.ProgramInvocationException: The program > caused an error: > > In the c
Re: Challenges using Flink REST API
Hey Chesnay, Actually I was mistaken by stating that in the JobManager logs I got the full stacktrace because I actually got the following there: 2019-03-13 11:55:13,906 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Exception occurred in REST handler: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. By some googling I came across this Jira issue [1], which seems to fix my issue in 1.8.0. However, I was still confused why this ever worked for me in 1.4.2 and by checking some binaries I found out that the REST API was reworked for 1.5.0 [2] which removed the full stack trace. Is there any (official) Docker image to already run Flink 1.8? Thanks, Wouter [1]: https://jira.apache.org/jira/browse/FLINK-11423 [2]: *https://jira.apache.org/jira/browse/FLINK-7715 <https://jira.apache.org/jira/browse/FLINK-7715>* Op wo 13 mrt. 2019 om 12:18 schreef Chesnay Schepler : > Can you give me the stacktrace that is logged in the JobManager logs? > > > On 13.03.2019 10:57, Wouter Zorgdrager wrote: > > Hi Chesnay, > > Unfortunately this is not true when I run the Flink 1.7.2 docker images. > The response is still: > { > "errors": [ > "org.apache.flink.client.program.ProgramInvocationException: The > main method caused an error." > ] > } > > Regards, > Wouter Zorgdrager > > Op wo 13 mrt. 2019 om 10:42 schreef Chesnay Schepler : > >> You should get the full stacktrace if you upgrade to 1.7.2 . >> >> >> On 13.03.2019 09:55, Wouter Zorgdrager wrote: >> >> Hey all! >> >> I'm looking for some advice on the following; I'm working on an >> abstraction on top of Apache Flink to 'pipeline' Flink applications using >> Kafka. For deployment this means that all these Flink jobs are embedded >> into one jar and each job is started using an program argument (e.g. >> "--stage 'FirstFlinkJob'". To ease deploying a set of interconnected Flink >> jobs onto a cluster I wrote a Python script which basically communicates >> with the REST client of the JobManager. So you can do things like "pipeline >> start --jar 'JarWithThePipeline.jar'" and this would deploy every Flink >> application separately. >> >> However, this script was written a while ago against Flink version >> "1.4.2". This week I tried to upgrade it to Flink latest version but I >> noticed a change in the REST responses. In order to get the "pipeline >> start" command working,we need to know all the Flink jobs that are in the >> jar (we call these Flink jobs 'stages') because we need to know the stage >> names as argument for the jar. For the 1.4.2 version we used a dirty trick; >> we ran the jar with '--list --asException' as program arguments which >> basically runs the jar file and immediately throws an exception with the >> stage names. These are then parsed and used to start every stage >> separately. The error message that Flink threw looked something like this: >> >> java.util.concurrent.CompletionException: >> org.apache.flink.util.FlinkException: Could not run the jar. >> at >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90) >> at >> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.flink.util.FlinkException: Could not run the jar. >> ... 9 more >> Caused by: org.apache.flink.client.program.ProgramInvocationException: >> The main method caused an error. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) >> at >> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >> at >> org.apache.flink.client.program.ClusterClie
Case class field limit
Hey all, Since Scala 2.11 the amount of fields in a case class isn't restricted to 22 anymore [1]. I was wondering if Flink still uses this limit internally, if I check the documentation [2] I also see a max of 22 fields. However, I just ran a simple test setup with a case class > 22 fields and this worked fine. Is the documentation outdated or am I missing something? Hope someone can clarify! Cheers, Wouter [1] - https://github.com/scala/bug/issues/7296 [2] - https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/types_serialization.html#flinks-typeinformation-class
Re: Case class field limit
Done! https://issues.apache.org/jira/browse/FLINK-11996 Op vr 22 mrt. 2019 om 11:52 schreef Chesnay Schepler : > It is likely that the documentation is outdated. Could open a JIRA for > updating the documentation? > > On 22/03/2019 10:12, Wouter Zorgdrager wrote: > > Hey all, > > > > Since Scala 2.11 the amount of fields in a case class isn't restricted > > to 22 anymore [1]. I was wondering if Flink still uses this limit > > internally, if I check the documentation [2] I also see a max of 22 > > fields. However, I just ran a simple test setup with a case class > 22 > > fields and this worked fine. Is the documentation outdated or am I > > missing something? > > > > Hope someone can clarify! > > > > Cheers, > > Wouter > > > > > > [1] - https://github.com/scala/bug/issues/7296 > > [2] - > > > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/types_serialization.html#flinks-typeinformation-class > > > > >
Re: Read mongo datasource in Flink
For a framework I'm working on, we actually implemented a (basic) Mongo source [1]. It's written in Scala and uses Json4s [2] to parse the data into a case class. It uses a Mongo observer to iterate over a collection and emit it into a Flink context. Cheers, Wouter [1]: https://github.com/codefeedr/codefeedr/blob/develop/codefeedr-plugins/codefeedr-mongodb/src/main/scala/org/codefeedr/plugins/mongodb/BaseMongoSource.scala [2]: http://json4s.org/ Op ma 29 apr. 2019 om 13:57 schreef Flavio Pompermaier : > I'm not aware of an official source/sink..if you want you could try to > exploit the Mongo HadoopInputFormat as in [1]. > The provided link use a pretty old version of Flink but it should not be a > big problem to update the maven dependencies and the code to a newer > version. > > Best, > Flavio > > [1] https://github.com/okkam-it/flink-mongodb-test > > On Mon, Apr 29, 2019 at 6:15 AM Hai wrote: > >> Hi, >> >> >> Can anyone give me a clue about how to read mongodb’s data as a >> batch/streaming datasource in Flink? I don’t find the mongodb connector in >> recent release version . >> >> >> Many thanks >> > >
Re: Read mongo datasource in Flink
Yes, that is correct. This is a really basic implementation that doesn't take parallelism into account. I think you need something like this [1] to get that working. [1]: https://docs.mongodb.com/manual/reference/command/parallelCollectionScan/#dbcmd.parallelCollectionScan Op ma 29 apr. 2019 om 14:37 schreef Flavio Pompermaier : > But what about parallelism with this implementation? From what I see > there's only a single thread querying Mongo and fetching all the data..am I > wrong? > > On Mon, Apr 29, 2019 at 2:05 PM Wouter Zorgdrager < > w.d.zorgdra...@tudelft.nl> wrote: > >> For a framework I'm working on, we actually implemented a (basic) Mongo >> source [1]. It's written in Scala and uses Json4s [2] to parse the data >> into a case class. It uses a Mongo observer to iterate over a collection >> and emit it into a Flink context. >> >> Cheers, >> Wouter >> >> [1]: >> https://github.com/codefeedr/codefeedr/blob/develop/codefeedr-plugins/codefeedr-mongodb/src/main/scala/org/codefeedr/plugins/mongodb/BaseMongoSource.scala >> >> [2]: http://json4s.org/ >> >> Op ma 29 apr. 2019 om 13:57 schreef Flavio Pompermaier < >> pomperma...@okkam.it>: >> >>> I'm not aware of an official source/sink..if you want you could try to >>> exploit the Mongo HadoopInputFormat as in [1]. >>> The provided link use a pretty old version of Flink but it should not be >>> a big problem to update the maven dependencies and the code to a newer >>> version. >>> >>> Best, >>> Flavio >>> >>> [1] https://github.com/okkam-it/flink-mongodb-test >>> >>> On Mon, Apr 29, 2019 at 6:15 AM Hai wrote: >>> >>>> Hi, >>>> >>>> >>>> Can anyone give me a clue about how to read mongodb’s data as a >>>> batch/streaming datasource in Flink? I don’t find the mongodb connector in >>>> recent release version . >>>> >>>> >>>> Many thanks >>>> >>> >>> >
Preserve accumulators after failure in DataStream API
Hi all, In the documentation I read about UDF accumulators [1] "Accumulators are automatically backup-ed by Flink’s checkpointing mechanism and restored in case of a failure to ensure exactly-once semantics." So I assumed this also was the case of accumulators used in the DataStream API, but I noticed that it isn't. So every time my jobs crashes and restarts, the accumulator is reset. Is there a way to retain this information? Thanks, Wouter [1]. https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html
Re: Preserve accumulators after failure in DataStream API
Hi Fabian, Maybe I should clarify a bit, actually I'm using a (Long)Counter registered as Accumulator in the RuntimeContext [1]. So I'm using a KeyedProcessFunction, not an AggregateFunction. This works property, but is not retained after a job restart. I'm not entirely sure if I did this correct. Thx, Wouter [1]. https://github.com/codefeedr/ghtorrent_mirror/blob/01e5cde837342993c7d287c60e40762e98f8d010/src/main/scala/org/codefeedr/experimental/stats/CommitsStatsProcess.scala#L34 Op do 2 mei 2019 om 09:36 schreef Fabian Hueske : > Hi Wouter, > > The DataStream API accumulators of the AggregateFunction [1] are stored in > state and should be recovered in case of a failure as well. > If this does not work, it would be a serious bug. > > What's the type of your accumulator? > Can you maybe share the code? > How to you apply the AggregateFunction (window, windowAll, ...)? > > Thanks, > Fabian > > [1] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java > > Am Di., 30. Apr. 2019 um 13:19 Uhr schrieb Wouter Zorgdrager < > w.d.zorgdra...@tudelft.nl>: > >> Hi all, >> >> In the documentation I read about UDF accumulators [1] "Accumulators are >> automatically backup-ed by Flink’s checkpointing mechanism and restored in >> case of a failure to ensure exactly-once semantics." So I assumed this >> also was the case of accumulators used in the DataStream API, but I noticed >> that it isn't. So every time my jobs crashes and restarts, the accumulator >> is reset. Is there a way to retain this information? >> >> Thanks, >> Wouter >> >> >> [1]. >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html >> >
Re: Preserve accumulators after failure in DataStream API
+1, especially if you don't want to rely on external metric reporter this is a nice feature. Op do 2 mei 2019 om 10:29 schreef Fabian Hueske : > Hi, > > Both of you seem to have the same requirement. > This is a good indication that "fault-tolerant metrics" are a missing > feature. > It might make sense to think about a built-in mechanism to back metrics > with state. > > Cheers, > Fabian > > > > Am Do., 2. Mai 2019 um 10:25 Uhr schrieb Paul Lam : > >> Hi Wouter, >> >> I've met the same issue and finally managed to use operator states to >> back the accumulators, so they can be restored after restarts. >> The downside is that we have to update the values in both accumulators >> and states to make them consistent. FYI. >> >> Best, >> Paul Lam >> >> Fabian Hueske 于2019年5月2日周四 下午4:17写道: >> >>> Hi Wouter, >>> >>> OK, that explains it :-) Overloaded terms... >>> >>> The Table API / SQL documentation refers to the accumulator of an >>> AggregateFunction [1]. >>> The accumulators that are accessible via the RuntimeContext are a rather >>> old part of the API that is mainly intended for batch jobs. >>> >>> I would not use them for streaming applications as they are not >>> checkpointed and recovered (as you noticed). >>> You should use managed state (keyed or operator) for such use cases. >>> >>> Best, >>> Fabian >>> >>> [1] >>> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java >>> >>> Am Do., 2. Mai 2019 um 10:01 Uhr schrieb Wouter Zorgdrager < >>> w.d.zorgdra...@tudelft.nl>: >>> >>>> Hi Fabian, >>>> >>>> Maybe I should clarify a bit, actually I'm using a (Long)Counter >>>> registered as Accumulator in the RuntimeContext [1]. So I'm using a >>>> KeyedProcessFunction, not an AggregateFunction. This works property, but is >>>> not retained after a job restart. I'm not entirely sure if I did this >>>> correct. >>>> >>>> Thx, >>>> Wouter >>>> >>>> >>>> [1]. >>>> https://github.com/codefeedr/ghtorrent_mirror/blob/01e5cde837342993c7d287c60e40762e98f8d010/src/main/scala/org/codefeedr/experimental/stats/CommitsStatsProcess.scala#L34 >>>> >>>> >>>> >>>> Op do 2 mei 2019 om 09:36 schreef Fabian Hueske : >>>> >>>>> Hi Wouter, >>>>> >>>>> The DataStream API accumulators of the AggregateFunction [1] are >>>>> stored in state and should be recovered in case of a failure as well. >>>>> If this does not work, it would be a serious bug. >>>>> >>>>> What's the type of your accumulator? >>>>> Can you maybe share the code? >>>>> How to you apply the AggregateFunction (window, windowAll, ...)? >>>>> >>>>> Thanks, >>>>> Fabian >>>>> >>>>> [1] >>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java >>>>> >>>>> Am Di., 30. Apr. 2019 um 13:19 Uhr schrieb Wouter Zorgdrager < >>>>> w.d.zorgdra...@tudelft.nl>: >>>>> >>>>>> Hi all, >>>>>> >>>>>> In the documentation I read about UDF accumulators [1] "Accumulators >>>>>> are automatically backup-ed by Flink’s checkpointing mechanism and >>>>>> restored >>>>>> in case of a failure to ensure exactly-once semantics." So I assumed >>>>>> this also was the case of accumulators used in the DataStream API, but I >>>>>> noticed that it isn't. So every time my jobs crashes and restarts, the >>>>>> accumulator is reset. Is there a way to retain this information? >>>>>> >>>>>> Thanks, >>>>>> Wouter >>>>>> >>>>>> >>>>>> [1]. >>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html >>>>>> >>>>>
Flink and Prometheus setup in K8s
Hey all, I'm working on a deployment setup with Flink and Prometheus on Kubernetes. I'm running into the following issues: 1) Is it possible to use the default Flink Docker image [1] and enable the Prometheus reporter? Modifying the flink-config.yaml is easy, but somehow the Prometheus reporter jar needs to be moved within the image. This is easy if use my own Dockerfile (as done here [2]) , but I prefer using the official one. 2) I can define the jobmanager/taskmanager metric endpoints statically, but w.r.t. scaling I prefer to have these resolved/discovered dynamically. Did anyone get a working setup on this? I came across this resource for YARN [3], is there something similar for Kubernetes? Or are there any other ways of configuring Prometheus to pick this up automatically? Thanks a lot for your help! Kind regards, Wouter [1]: https://hub.docker.com/_/flink/ [2]: https://github.com/mbode/flink-prometheus-example/blob/master/Dockerfile [3]: https://github.com/eastcirclek/flink-service-discovery
Re: Flink and Prometheus setup in K8s
Hi all, To answer my own questions I worked on the following solution: 1) Custom Docker image which pulls the Flink image and moves Prometheus jar to the correct folder [1, 2]. 2) I wrote manifests for Kubernetes with service discovery configuration for Kubernetes [3]. Besides the 'official' Flink Kubernetes manifests, I added a TM service which exposes all TM metrics locations so that Prometheus can scrape it. This means that (re)-scaling Flink TM's are automatically picked up by Prometheus. The repository also includes a Grafana setup with a simple dashboard. I thought this might be useful for other users! Cheers, Wouter [1]: https://github.com/wzorgdrager/flink-k8s/blob/master/docker/Dockerfile [2]: https://hub.docker.com/r/wzorgdrager/flink-prometheus [3]: https://github.com/wzorgdrager/flink-k8s Op ma 13 mei 2019 om 14:16 schreef Wouter Zorgdrager < w.d.zorgdra...@tudelft.nl>: > Hey all, > > I'm working on a deployment setup with Flink and Prometheus on Kubernetes. > I'm running into the following issues: > > 1) Is it possible to use the default Flink Docker image [1] and enable the > Prometheus reporter? Modifying the flink-config.yaml is easy, but somehow > the Prometheus reporter jar needs to be moved within the image. This is > easy if use my own Dockerfile (as done here [2]) , but I prefer using the > official one. > 2) I can define the jobmanager/taskmanager metric endpoints statically, > but w.r.t. scaling I prefer to have these resolved/discovered dynamically. > Did anyone get a working setup on this? I came across this resource for > YARN [3], is there something similar for Kubernetes? Or are there any other > ways of configuring Prometheus to pick this up automatically? > > Thanks a lot for your help! > > Kind regards, > Wouter > > [1]: https://hub.docker.com/_/flink/ > [2]: > https://github.com/mbode/flink-prometheus-example/blob/master/Dockerfile > [3]: https://github.com/eastcirclek/flink-service-discovery >
Re: Flink not giving full reason as to why job submission failed
Hi Harshith, This was indeed an issue in 1.7.2, but fixed in 1.8.0. See the corresponding Jira issue [1]. Cheers, Wouter [1]: https://issues.apache.org/jira/browse/FLINK-11902 Op do 16 mei 2019 om 16:05 schreef Kumar Bolar, Harshith : > Hi all, > > > > After upgrading Flink to 1.7.2, when I try to submit a job from the > dashboard and there's some issue with the job, the job submission fails > with the following error. > > > > Exception occurred in REST handler: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > > There's no other reason given as to why the job failed to submit. This was > not the case in 1.4.2. Is there a way to see the full reason why the job > failed to deploy? I see the same error in the logs too with no additional > information. > > Thanks, > > Harshith >
Re: Re: Flink not giving full reason as to why job submission failed
Hi Harshith, This is indeed an issue not resolved in 1.8. I added a comment to the (closed) Jira issue, so this might be fixed in further releases. Cheers, Wouter Op ma 20 mei 2019 om 16:18 schreef Kumar Bolar, Harshith : > Hi Wouter, > > > > I’ve upgraded Flink to 1.8, but now I only see Internal server error on > the dashboard when a job deployment fails. > > > > > > But in the logs I see the correct exception - > Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 > > at > functions.PersistPIDMessagesToCassandra.main(PersistPIDMessagesToCassandra.java:59) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > > ... 9 more > > > > Is there any way to show these errors on the dashboard? Because many > application teams deploy jobs through the dashboard and don’t have ready > access to the logs. > > > > Thanks, > > Harshith > > > > *From: *Wouter Zorgdrager > *Date: *Thursday, 16 May 2019 at 7:56 PM > *To: *Harshith Kumar Bolar > *Cc: *user > *Subject: *[External] Re: Flink not giving full reason as to why job > submission failed > > > > Hi Harshith, > > > > This was indeed an issue in 1.7.2, but fixed in 1.8.0. See the > corresponding Jira issue [1]. > > > > Cheers, > > Wouter > > > > [1]: https://issues.apache.org/jira/browse/FLINK-11902 > <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D11902&d=DwMFaQ&c=gtIjdLs6LnStUpy9cTOW9w&r=61bFb6zUNKZxlAQDRo_jKA&m=mnxsj0sRDEDo66PWXKz0vcnqyR6N6FFcRT1a8fQQ_tQ&s=ExTlIDAtlDhFXbfvmWEX7sSHnlu6sLz3SdyB9-1Hv60&e=> > > > > > Op do 16 mei 2019 om 16:05 schreef Kumar Bolar, Harshith >: > > Hi all, > > > > After upgrading Flink to 1.7.2, when I try to submit a job from the > dashboard and there's some issue with the job, the job submission fails > with the following error. > > > > Exception occurred in REST handler: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > > There's no other reason given as to why the job failed to submit. This was > not the case in 1.4.2. Is there a way to see the full reason why the job > failed to deploy? I see the same error in the logs too with no additional > information. > > Thanks, > > Harshith > >
Unexpected behavior from interval join in Flink
Hi all, I'm experiencing some unexpected behavior using an interval join in Flink. I'm dealing with two data sets, lets call them X and Y. They are finite (10k elements) but I interpret them as a DataStream. The data needs to be joined for enrichment purposes. I use event time and I know (because I generated the data myself) that the timestamp of an element Y is always between -60 minutes and +30 minutes of the element with the same key in set X. Both datasets are in-order (in terms of timestamps), equal in size, share a common key and parallelism is set to 1 throughout the whole program. The code to join looks something like this: xStream .assignAscendingTimestamps(_.date.getTime) .keyBy(_.commonKey) .intervalJoin( yStream .assignAscendingTimestamps(_.date.getTime) .keyBy(_.commonKey)) .between(Time.minutes(-60), Time.minutes(30)) .process(new ProcessJoinFunction[X, Y, String] { override def processElement( left: X, right: Y, ctx: ProcessJoinFunction[X, Y, String]#Context, out: Collector[String]): Unit = { out.collect(left + ":" + right) } However, about 30% percent of the data is not joined. Is there a proper way to debug this? For instance, in windows you can side-output late data. Is there a possibility to side-output unjoinable data? Thx a lot, Wouter
Re: Unexpected behavior from interval join in Flink
Anyone some leads on this issue? Have been looking into the IntervalJoinOperator code, but that didn't really help. My intuition is that it is rejected because of lateness, however that still confuses me since I'm sure that both datastreams have monotonic increasing timestamps. Thx, Wouter Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager < w.d.zorgdra...@tudelft.nl>: > Hi all, > > I'm experiencing some unexpected behavior using an interval join in Flink. > I'm dealing with two data sets, lets call them X and Y. They are finite > (10k elements) but I interpret them as a DataStream. The data needs to be > joined for enrichment purposes. I use event time and I know (because I > generated the data myself) that the timestamp of an element Y is always > between -60 minutes and +30 minutes of the element with the same key in set > X. Both datasets are in-order (in terms of timestamps), equal in size, > share a common key and parallelism is set to 1 throughout the whole program. > > The code to join looks something like this: > > xStream > .assignAscendingTimestamps(_.date.getTime) > .keyBy(_.commonKey) > .intervalJoin( > yStream > .assignAscendingTimestamps(_.date.getTime) > .keyBy(_.commonKey)) > .between(Time.minutes(-60), Time.minutes(30)) > .process(new ProcessJoinFunction[X, Y, String] { > override def processElement( > left: X, > right: Y, > ctx: ProcessJoinFunction[X, Y, String]#Context, > out: Collector[String]): Unit = { > > out.collect(left + ":" + right) > } > > > However, about 30% percent of the data is not joined. Is there a proper > way to debug this? For instance, in windows you can side-output late data. > Is there a possibility to side-output unjoinable data? > > Thx a lot, > Wouter > > >
Re: Unexpected behavior from interval join in Flink
Hi Fabian, Thanks for your reply. I managed to resolve this issue. Actually this behavior was not so unexpected, I messed up using xStream as a 'base' while I needed to use yStream as a 'base'. I.e. yStream.element - 60 min <= xStream.element <= yStream.element + 30 min. Interchanging both datastreams fixed this issue. Thanks anyways. Cheers, Wouter Op ma 24 jun. 2019 om 11:22 schreef Fabian Hueske : > Hi Wouter, > > Not sure what is going wrong there, but something that you could try is to > use a custom watemark assigner and always return a watermark of 0. > When the source finished serving the watermarks, it emits a final > Long.MAX_VALUE watermark. > Hence the join should consume all events and store them in state. When > both sources are finished, it would start to join the data and clean up the > state. > This test would show if there are any issue with late data. > > Best, Fabian > > Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager < > w.d.zorgdra...@tudelft.nl>: > >> Anyone some leads on this issue? Have been looking into the >> IntervalJoinOperator code, but that didn't really help. My intuition is >> that it is rejected because of lateness, however that still confuses me >> since I'm sure that both datastreams have monotonic increasing timestamps. >> >> Thx, Wouter >> >> Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager < >> w.d.zorgdra...@tudelft.nl>: >> >>> Hi all, >>> >>> I'm experiencing some unexpected behavior using an interval join in >>> Flink. >>> I'm dealing with two data sets, lets call them X and Y. They are finite >>> (10k elements) but I interpret them as a DataStream. The data needs to be >>> joined for enrichment purposes. I use event time and I know (because I >>> generated the data myself) that the timestamp of an element Y is always >>> between -60 minutes and +30 minutes of the element with the same key in set >>> X. Both datasets are in-order (in terms of timestamps), equal in size, >>> share a common key and parallelism is set to 1 throughout the whole program. >>> >>> The code to join looks something like this: >>> >>> xStream >>> .assignAscendingTimestamps(_.date.getTime) >>> .keyBy(_.commonKey) >>> .intervalJoin( >>> yStream >>> .assignAscendingTimestamps(_.date.getTime) >>> .keyBy(_.commonKey)) >>> .between(Time.minutes(-60), Time.minutes(30)) >>> .process(new ProcessJoinFunction[X, Y, String] { >>> override def processElement( >>> left: X, >>> right: Y, >>> ctx: ProcessJoinFunction[X, Y, String]#Context, >>> out: Collector[String]): Unit = { >>> >>> out.collect(left + ":" + right) >>> } >>> >>> >>> However, about 30% percent of the data is not joined. Is there a proper >>> way to debug this? For instance, in windows you can side-output late data. >>> Is there a possibility to side-output unjoinable data? >>> >>> Thx a lot, >>> Wouter >>> >>> >>>
RichAsyncFunction in Scala
Hi, Currently there is no way of using the RichAsyncFunction in Scala, this means I can't get access to the RuntimeContext. I know someone is working on this: https://issues.apache.org/jira/browse/FLINK-6756 , however in the meantime is there a workaround for this? I'm particularly interested in getting the index of the subtask in my AsyncFunction. Regards, Wouter
KafkaProducer with generic (Avro) serialization schema
Dear reader, I'm currently working on writing a KafkaProducer which is able to serialize a generic type using avro4s. However this serialization schema is not serializable itself. Here is my code for this: The serialization schema: class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] { override def serialize(element: IN): Array[Byte] = { val byteArray = new ByteArrayOutputStream() val avroSer = AvroOutputStream.binary[IN](byteArray) avroSer.write(element) avroSer.flush() avroSer.close() return byteArray.toByteArray } } The job code: case class Person(name : String, age : Int, address : Address) case class Address(city : String, street : String) class SimpleJob { @transient private lazy val serSchema : AvroSerializationSchema[Person] = new AvroSerializationSchema[Person]() def start() = { val testPerson = Person("Test", 100, Address("Test", "Test")) val env = StreamExecutionEnvironment.getExecutionEnvironment env. fromCollection(Seq(testPerson)). addSink(createKafkaSink()) env.execute("Flink sample job") } def createKafkaSink() : RichSinkFunction[Person] = { //set some properties val properties = new Properties() properties.put("bootstrap.servers", "127.0.0.01:9092") properties.put("zookeeper.connect", "127.0.0.1:2181") new FlinkKafkaProducer011[Person]("persons", serSchema, properties) } } The code does compile, however it gives the following error on runtime: InvalidProgramException: Object org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d is not serializable. I assume this means that my custom SerializationSchema is not serializable due to the use of SchemaFor, FromRecord and ToRecord. Anyone knows a solution or workaround? Thanks in advance! Wouter
Re: KafkaProducer with generic (Avro) serialization schema
Hi Bill, Thanks for your answer. However this proposal isn't going to solve my issue, since the problem here is that the context bounds I need to give in order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't serializable classes. This results in Flink not being able to serialize the KafkaProducer failing the whole job. Thanks, Wouter Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill : > The things I would try would first in you are you class Person and Address > have getters and setters and a no argument constructor. > > > > *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com] > *Sent:* Wednesday, April 25, 2018 7:17 AM > *To:* user@flink.apache.org > *Subject:* KafkaProducer with generic (Avro) serialization schema > > > > Dear reader, > > > > I'm currently working on writing a KafkaProducer which is able to > serialize a generic type using avro4s. > > However this serialization schema is not serializable itself. Here is my > code for this: > > > > The serialization schema: > > class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] > extends SerializationSchema[IN] { > > > > override def serialize(element: IN): Array[Byte] = { > > val byteArray = new ByteArrayOutputStream() > > val avroSer = AvroOutputStream.binary[IN](byteArray) > > avroSer.write(element) > > avroSer.flush() > > avroSer.close() > > > > return byteArray.toByteArray > > } > > } > > > > The job code: > > case class Person(name : String, age : Int, address : Address) > > case class Address(city : String, street : String) > > > > class SimpleJob { > > > > @transient > > private lazy val serSchema : AvroSerializationSchema[Person] = new > AvroSerializationSchema[Person]() > > > > def start() = { > > val testPerson = Person("Test", 100, Address("Test", "Test")) > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > > > env. > > fromCollection(Seq(testPerson)). > > addSink(createKafkaSink()) > > > > env.execute("Flink sample job") > > } > > > > > > def createKafkaSink() : RichSinkFunction[Person] = { > > //set some properties > > val properties = new Properties() > > properties.put("bootstrap.servers", "127.0.0.01:9092 > <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc&e=> > ") > > properties.put("zookeeper.connect", "127.0.0.1:2181 > <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ&e=> > ") > > > > new FlinkKafkaProducer011[Person]("persons", serSchema, properties) > > } > > > > } > > > > The code does compile, however it gives the following error on > runtime: InvalidProgramException: Object > org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d > is not serializable. > > > > I assume this means that my custom SerializationSchema is not serializable > due to the use of SchemaFor, FromRecord and ToRecord. > > Anyone knows a solution or workaround? > > > > Thanks in advance! > > Wouter > > This message contains confidential information and is intended only for > the individual named. If you are not the named addressee, you should not > disseminate, distribute, alter or copy this e-mail. Please notify the > sender immediately by e-mail if you have received this e-mail by mistake > and delete this e-mail from your system. E-mail transmissions cannot be > guaranteed to be secure or without error as information could be > intercepted, corrupted, lost, destroyed, arrive late or incomplete, or > contain viruses. The sender, therefore, does not accept liability for any > errors or omissions in the contents of this message which arise during or > as a result of e-mail transmission. If verification is required, please > request a hard-copy version. This message is provided for information > purposes and should not be construed as a solicitation or offer to buy or > sell any securities or related financial instruments in any jurisdiction. > Securities are offered in
Re: KafkaProducer with generic (Avro) serialization schema
So, I'm still struggling with this issue. I dived a bit more into the problem and I'm pretty sure that the problem is that I have to (implicitly) pass the SchemaFor and RecordTo classes to my serialization schema (otherwise I can't make it generic). However those class aren't serializable, but of course I can't annotate them transient nor make it a lazy val which gives me the current issue. I hope someone has some leads for me. Thanks! Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager : > Hi Bill, > > Thanks for your answer. However this proposal isn't going to solve my > issue, since the problem here is that the context bounds I need to give in > order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't > serializable classes. This results in Flink not being able to serialize the > KafkaProducer failing the whole job. > > Thanks, > Wouter > > Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill < > william.nort...@pimco.com>: > >> The things I would try would first in you are you class Person and >> Address have getters and setters and a no argument constructor. >> >> >> >> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com] >> *Sent:* Wednesday, April 25, 2018 7:17 AM >> *To:* user@flink.apache.org >> *Subject:* KafkaProducer with generic (Avro) serialization schema >> >> >> >> Dear reader, >> >> >> >> I'm currently working on writing a KafkaProducer which is able to >> serialize a generic type using avro4s. >> >> However this serialization schema is not serializable itself. Here is my >> code for this: >> >> >> >> The serialization schema: >> >> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] >> extends SerializationSchema[IN] { >> >> >> >> override def serialize(element: IN): Array[Byte] = { >> >> val byteArray = new ByteArrayOutputStream() >> >> val avroSer = AvroOutputStream.binary[IN](byteArray) >> >> avroSer.write(element) >> >> avroSer.flush() >> >> avroSer.close() >> >> >> >> return byteArray.toByteArray >> >> } >> >> } >> >> >> >> The job code: >> >> case class Person(name : String, age : Int, address : Address) >> >> case class Address(city : String, street : String) >> >> >> >> class SimpleJob { >> >> >> >> @transient >> >> private lazy val serSchema : AvroSerializationSchema[Person] = new >> AvroSerializationSchema[Person]() >> >> >> >> def start() = { >> >> val testPerson = Person("Test", 100, Address("Test", "Test")) >> >> >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> >> >> env. >> >> fromCollection(Seq(testPerson)). >> >> addSink(createKafkaSink()) >> >> >> >> env.execute("Flink sample job") >> >> } >> >> >> >> >> >> def createKafkaSink() : RichSinkFunction[Person] = { >> >> //set some properties >> >> val properties = new Properties() >> >> properties.put("bootstrap.servers", "127.0.0.01:9092 >> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc&e=> >> ") >> >> properties.put("zookeeper.connect", "127.0.0.1:2181 >> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ&e=> >> ") >> >> >> >> new FlinkKafkaProducer011[Person]("persons", serSchema, properties) >> >> } >> >> >> >> } >> >> >> >> The code does compile, however it gives the following error on >> runtime: InvalidProgramException: Object >> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d >> is not serializable. >> >> >> >> I assume this means that my custom SerializationSchema is not >> serializable due to the use of SchemaFor, FromRecord
Re: KafkaProducer with generic (Avro) serialization schema
Hi, Thanks for the suggestions. Unfortunately I cannot make FromRecord/ForRecord/SchemaFor serializable, since those classes are out of my control. I use those from the avro4s library (https://github.com/sksamuel/avro4s). The problem here, especially with the deserializer is that I need to convert an Avro 'GenericRecord' to a Scala case class. Avro is written in Java, so thats a bit problematic and therefore I need to Avro4s library. Avro4s tries to verify on compile-time if the generic is actually convertible from/to a generic record, that is why I need those context bounds. As for @Aljoscha's workaround, I don't understand how this would solve it? Because doesn't that just move the problem? If I create a factory, I still need the generic (with context bounds) I specify at my KafkaConsumer/Deserialization schema. @Fabian I'm not sure if I understand your proposal. I still need the context bounds for those compile-time macro's of Avro4s. Once again, thanks for your help so far! Regards, Wouter Op wo 2 mei 2018 om 16:48 schreef Fabian Hueske : > Hi Wouter, > > you can try to make the SerializationSchema serializable by overriding > Java's serialization methods writeObject() and readObject() similar as > Flink's AvroRowSerializationSchema [1] does. > > Best, Fabian > > [1] > https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java > > 2018-05-02 16:34 GMT+02:00 Piotr Nowojski : > >> Hi, >> >> My Scala knowledge is very limited (and my Scala's serialization >> knowledge is non existent), but one way or another you have to make your >> SerializationSchema serialisable. If indeed this is the problem, maybe a >> better place to ask this question is on Stack Overflow or some scala >> specific mailing list/board (unless someone else from the Flink's community >> can provide an answer to this problem)? >> >> Piotrek >> >> On 1 May 2018, at 16:30, Wouter Zorgdrager wrote: >> >> So, I'm still struggling with this issue. I dived a bit more into the >> problem and I'm pretty sure that the problem is that I have to (implicitly) >> pass the SchemaFor and RecordTo classes to my serialization schema >> (otherwise I can't make it generic). However those class aren't >> serializable, but of course I can't annotate them transient nor make it a >> lazy val which gives me the current issue. >> >> I hope someone has some leads for me. >> >> Thanks! >> >> Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager < >> zorgdrag...@gmail.com>: >> >>> Hi Bill, >>> >>> Thanks for your answer. However this proposal isn't going to solve my >>> issue, since the problem here is that the context bounds I need to give in >>> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't >>> serializable classes. This results in Flink not being able to serialize the >>> KafkaProducer failing the whole job. >>> >>> Thanks, >>> Wouter >>> >>> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill < >>> william.nort...@pimco.com>: >>> >>>> The things I would try would first in you are you class Person and >>>> Address have getters and setters and a no argument constructor. >>>> >>>> >>>> >>>> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com] >>>> *Sent:* Wednesday, April 25, 2018 7:17 AM >>>> *To:* user@flink.apache.org >>>> *Subject:* KafkaProducer with generic (Avro) serialization schema >>>> >>>> >>>> >>>> Dear reader, >>>> >>>> >>>> >>>> I'm currently working on writing a KafkaProducer which is able to >>>> serialize a generic type using avro4s. >>>> >>>> However this serialization schema is not serializable itself. Here is >>>> my code for this: >>>> >>>> >>>> >>>> The serialization schema: >>>> >>>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] >>>> extends SerializationSchema[IN] { >>>> >>>> >>>> >>>> override def serialize(element: IN): Array[Byte] = { >>>> >>>> val byteArray = new ByteArrayOutputStream() >>>> >>>> val avroSer = AvroOutputStream.binary[IN](byteArray) >>>> >>>> av
Side outputs PyFlink
Dear Flink community, First of all, I'm very excited about the new 1.13 release. Among other features, I'm particularly excited about the support of stateful operations in Python. I think it will make the wonders of stream processing and the power of Flink accessible to more developers. I'm currently playing around a bit with these new features and I was wondering if there are already plans to support side output in the Python API? This already works pretty neatly in the DataStream API but couldn't find any communication on adding this to PyFlink. In the meantime, what do you suggest for a workaround on side outputs? Intuitively, I would copy a stream and add a filter for each side output but this seems a bit inefficient. In that setup, each side output will need to go over the complete stream. Any ideas? Thanks in advance! Regards, Wouter
PyFlink DataStream union type mismatch
Dear all, I'm having trouble unifying two data streams using the union operator in PyFlink. My code basically looks like this: init_stream = (operator_stream .filter(lambda r: r[0] is None) .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()])) .key_by(lambda x: x[0], Types.STRING()) ) stateful_operator_stream = (operator_stream .filter(lambda r: r[0] is not None) .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), Types.PICKLED_BYTE_ARRAY()])) .key_by(lambda x: x[0],Types.STRING()) ) print(init_stream)print(init_stream.get_type()) print(stateful_operator_stream.get_type())print(stateful_operator_stream) final_operator_stream = init_stream .union(stateful_operator_stream) .process(stateful_operator) In short, I have a datastream (operator_stream) of type Tuple[str, Event] which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY(). When calling the union operator, I get an error which shows a type mismatch between both streams: py4j.protocol.Py4JJavaError: An error occurred while calling o732.union. : java.lang.IllegalArgumentException: Cannot union streams of different types: Java Tuple2 and Row(f0: String, f1: Java Tuple2) at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) However, when I print the types of both datastreams they seem similar: RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo)) RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo)) Any thoughts? Thanks in advance! Regards, Wouter
Re: PyFlink DataStream union type mismatch
Hi Dian, all, Thanks, that indeed solved my problem. I have two more questions, I'm not sure if it is better practice to send a new email to the mailing list or to re-use a thread: 1. I noticed very high latency (multiple seconds per message) for a job with multiple operators and very low throughput. I bet because messages are bundled until a size threshold or time threshold is met (and in a low throughput scenario, only the time threshold is triggered). This is also the idea I get when reading the configuration page [1]. However, these configuration values seem to be targeted at the TableAPI and it is unclear to me how to configure this for the Datastream API. To be clear, this is in PyFlink. 2. I'm using the JVM Kafka Consumer and Producer for my Python job. Therefore I had to add the flink-connector-sql-kafka jar to my Flink environment. I did this by downloading the jar file from Maven and putting it under 'venv/pyflink/lib'. Is there any easier way? I'm not particularly a fan of manually changing my venv. I tried to use stream_execution_environment.add_jars but that was unsuccessful, I still got a ClassNotFoundException. Hope you can help. As always, thanks a lot! Regards, Wouter [1] - https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/python/python_config/ On Fri, 21 May 2021 at 05:25, Dian Fu wrote: > Hi Wouter, > > 1) For the exception, it seems a bug. I have filed a ticket for it: > https://issues.apache.org/jira/browse/FLINK-22733 > > 2) Regarding to your requirements, I guess you should do it as following: > ``` > > init_stream = (operator_stream >.filter(lambda r: r[0] is None) > > .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()])) > ) > > stateful_operator_stream = (operator_stream > .filter(lambda r: r[0] is not None) > .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), > Types.PICKLED_BYTE_ARRAY()])) > ) > > > > init_stream.union(stateful_operator_stream).key_by(lambda x: x[0 > ],Types.STRING()) > > ``` > > The reason is that `union` will turns `KeyedStream` into `DataStream` and > you could not perform stateful operations on `DataStream` any more. > > Regards, > Dian > > 2021年5月21日 上午12:38,Wouter Zorgdrager 写道: > > Dear all, > > I'm having trouble unifying two data streams using the union operator in > PyFlink. My code basically looks like this: > > init_stream = (operator_stream >.filter(lambda r: r[0] is None) > > .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()])) >.key_by(lambda x: x[0], Types.STRING()) > ) > > stateful_operator_stream = (operator_stream > .filter(lambda r: r[0] is not None) > .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), > Types.PICKLED_BYTE_ARRAY()])) > .key_by(lambda x: x[0],Types.STRING()) > ) > print(init_stream)print(init_stream.get_type()) > print(stateful_operator_stream.get_type())print(stateful_operator_stream) > > final_operator_stream = init_stream > .union(stateful_operator_stream) > .process(stateful_operator) > > > In short, I have a datastream (operator_stream) of type Tuple[str, Event] > which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY(). > When calling the union operator, I get an error which shows a type mismatch > between both streams: > > py4j.protocol.Py4JJavaError: An error occurred while calling o732.union. > : java.lang.IllegalArgumentException: Cannot union streams of different > types: Java Tuple2 and Row(f0: String, f1: > Java Tuple2) > at > org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java
ByteSerializationSchema in PyFlink
Hi all, I have a PyFlink job connected to a KafkaConsumer and Producer. I want to directly work with the bytes from and to Kafka because I want to serialize/deserialize in my Python code rather than the JVM environment. Therefore, I can't use the SimpleStringSchema for (de)serialization (the messages aren't strings anyways). I've tried to create a TypeInformationSerializer with Types.BYTE(), see the code snippet below: class ByteSerializer(SerializationSchema, DeserializationSchema): def __init__(self, execution_environment): gate_way = get_gateway() j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema( Types.BYTE().get_java_type_info(), get_j_env_configuration(execution_environment), ) SerializationSchema.__init__(self, j_serialization_schema=j_byte_string_schema) DeserializationSchema.__init__( self, j_deserialization_schema=j_byte_string_schema )The ByteSerializer is used like this: return FlinkKafkaConsumer( ["client_request", "internal"], ByteSerializer(self.env._j_stream_execution_environment), { "bootstrap.servers": "localhost:9092", "auto.offset.reset": "latest", "group.id": str(uuid.uuid4()), }, ) However, this does not seem to work. I think the error is thrown in the JVM environment, which makes it a bit hard to parse in my Python stack trace, but I think it boils down to this stacktrace part: answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat java.base/java.lang.Thread.run(Thread.java:834)\\n' gateway_client = target_id = None name = 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema' def get_return_value(answer, gateway_client, target_id=None, name=None): """Converts an answer received from the Java gateway into a Python object. For example, string representation of integers are converted to Python integer, string representation of objects are converted to JavaObject instances, etc. :param answer: the string returned by the Java gateway :param gateway_client: the gateway client used to communicate with the Java Gateway. Only necessary if the answer is a reference (e.g., object, list, map) :param target_id: the name of the object from which the answer comes from (e.g., *object1* in `object1.hello()`). Optional. :param name: the name of the member from which the answer comes from (e.g., *hello* in `object1.hello()`). Optional. """ if is_error(answer)[0]: if len(answer) > 1: type = answer[1] value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) if answer[1] == REFERENCE_TYPE: raise Py4JJavaError( "An error occurred while calling {0}{1}{2}.\n". format(target_id, ".", name), value) else: > raise Py4JError( "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". format(target_id, ".", name, value)) E py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema. Trace: E org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class org.apache.flink.configuration.Configuration]) does not exist E at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179) E at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196) E at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237) E at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) E at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) E at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) E at java.base/java.lang.Thread.run(Thread.java:834) I hope you can help me out! Thanks in advance, Wouter
Re: ByteSerializationSchema in PyFlink
Hi Dian, all, Thanks for your suggestion. Unfortunately, it does not seem to work. I get the following exception: Caused by: java.lang.NegativeArraySizeException: -2147183315 at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81) at org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31) at org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) To be more precise, the messages in my Kafka topic are pickled Python objects. Maybe that is the reason for the exception, I also tried using Types.PICKLED_BYTE_ARRAY().get_java_type_info() but I think that has the same serializer because I get the same exception. Any suggestions? Thanks for your help! Regards, Wouter On Fri, 4 Jun 2021 at 08:24, Dian Fu wrote: > Hi Wouter, > > E org.apache.flink.api.python.shaded.py4j.Py4JException: > Constructor > org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class > org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class > org.apache.flink.configuration.Configuration]) does not exist > > > As the exception indicate, the constructor doesn’t exists. > > > > Could you try with the following: > > ``` > j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info() > j_type_serializer= > > j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig()) > > j_byte_string_schema = > gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info, > j_type_serializer) > > ``` > > Regards, > Dian > > 2021年6月3日 下午8:51,Wouter Zorgdrager 写道: > > Hi all, > > I have a PyFlink job connected to a KafkaConsumer and Producer. I want to > directly work with the bytes from and to Kafka because I want to > serialize/deserialize in my Python code rather than the JVM environment. > Therefore, I can't use the SimpleStringSchema for (de)serialization (the > messages aren't strings anyways). I've tried to create a > TypeInformationSerializer with Types.BYTE(), see the code snippet below: > > class ByteSerializer(SerializationSchema, DeserializationSchema): > def __init__(self, execution_environment): > gate_way = get_gateway() > > j_byte_string_schema = > gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema( > Types.BYTE().get_java_type_info(), > get_j_env_configuration(execution_environment), > ) > SerializationSchema.__init__(self, > j_serialization_schema=j_byte_string_schema) > DeserializationSchema.__init__( > self, j_deserialization_schema=j_byte_string_schema > )The ByteSerializer is used like this: > > > return FlinkKafkaConsumer( > ["client_request", "internal"], > ByteSerializer(self.env._j_stream_execution_environment), > { > "bootstrap.servers": "localhost:9092", > "auto.offset.reset": "latest", > "group.id": str(uuid.uuid4()), > }, > ) > However, this does not seem to work. I think the error is thrown in the JVM > environment, which makes it a bit hard to parse in my Python stack trace, > > but I think it boils down to this stacktrace part: > > > answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: > Constructor > org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat > java.base/java.lang.Thread.run(Thread.java:834)\\n' > gateway_client = > target_id = None > name =
Re: ByteSerializationSchema in PyFlink
Hi Dian, all, The way I resolved right now, is to write my own custom serializer which only maps from bytes to bytes. See the code below: public class KafkaBytesSerializer implements SerializationSchema, DeserializationSchema { @Override public byte[] deserialize(byte[] bytes) throws IOException { return bytes; } @Override public boolean isEndOfStream(byte[] bytes) { return false; } @Override public byte[] serialize(byte[] bytes) { return bytes; } @Override public TypeInformation getProducedType() { return TypeInformation.of(byte[].class); } } This code is packaged in a jar and uploaded through env.add_jars. That works like a charm! Thanks for the help! Wouter On Fri, 4 Jun 2021 at 14:40, Wouter Zorgdrager wrote: > Hi Dian, all, > > Thanks for your suggestion. Unfortunately, it does not seem to work. I get > the following exception: > > Caused by: java.lang.NegativeArraySizeException: -2147183315 > at > org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81) > at > org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31) > at > org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) > > To be more precise, the messages in my Kafka topic are pickled Python > objects. Maybe that is the reason for the exception, I also tried using > Types.PICKLED_BYTE_ARRAY().get_java_type_info() > but I think that has the same serializer because I get the same exception. > > Any suggestions? Thanks for your help! > > Regards, > Wouter > > On Fri, 4 Jun 2021 at 08:24, Dian Fu wrote: > >> Hi Wouter, >> >> E org.apache.flink.api.python.shaded.py4j.Py4JException: >> Constructor >> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class >> org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class >> org.apache.flink.configuration.Configuration]) does not exist >> >> >> As the exception indicate, the constructor doesn’t exists. >> >> >> >> Could you try with the following: >> >> ``` >> j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info() >> j_type_serializer= >> >> j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig()) >> >> j_byte_string_schema = >> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info, >> j_type_serializer) >> >> ``` >> >> Regards, >> Dian >> >> 2021年6月3日 下午8:51,Wouter Zorgdrager 写道: >> >> Hi all, >> >> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to >> directly work with the bytes from and to Kafka because I want to >> serialize/deserialize in my Python code rather than the JVM environment. >> Therefore, I can't use the SimpleStringSchema for (de)serialization (the >> messages aren't strings anyways). I've tried to create a >> TypeInformationSerializer with Types.BYTE(), see the code snippet below: >> >> class ByteSerializer(SerializationSchema, DeserializationSchema): >> def __init__(self, execution_environment): >> gate_way = get_gateway() >> >> j_byte_string_schema = >> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema( >> Types.BYTE().get_java_type_info(), >> get_j_env_configuration(execution_environment), >> ) >> SerializationSchema.__init__(self, >> j_serialization_schema=j_byte_str
PyFlink performance and deployment issues
Dear community, I have been struggling a lot with the deployment of my PyFlink job. Moreover, the performance seems to be very disappointing especially the low-throughput latency. I have been playing around with configuration values, but it has not been improving. In short, I have a Datastream job with multiple Python operators including a ProcessFunction. The job reads from Kafka and writes to Kafka again. For single events, E2E latency has been somewhere between 600ms and 2000ms. When I'm increasing throughput, latency becomes in the order of seconds. This is when I configure my job like this config.set_integer("python.fn-execution.bundle.time", 1) config.set_integer("python.fn-execution.bundle.size", 1) I tried several configuration values, but the results are similar. Interestingly, I have a similar Python streaming application written in Apache Beam which does have low-latency, single events are processed < 30ms. If I recall correctly, they use the same technique with bundling and sending to Python processes. On the other hand, Beam uses an in-memory runner when running locally which might change the situation. I'm not sure how that compares to a local Flink MiniCluster. I hoped that performance might improve when I deploy this on a (remote) Flink cluster. Unfortunately, I had a lot of trouble deploying this PyFlink job to a remote Flink cluster. In my first attempt, I created a local TM + JM setup and tried to deploy it using the "./flink run" command. However, this command created a local MiniCluster again rather than submitting it to my remote cluster. The full command was: ./flink run --target remote \ -m localhost:8081 \ -pyarch venv.zip \ -pyexec venv.zip/venv/bin/python \ --parallelism 4 \ --python ~/Documents/runtime.py \ --jarfile ~/Documents/combined.jar Note that venv.zip stores all the Python dependencies for my PyFlink job whereas combined.jar stores the Java dependencies. I tried several variants of this command, but it *never *submitted to my running JobManager and always ran it locally. In my second attempt, I tried deploying it to a Kubernetes cluster using the following command: ./flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=flink-cluster \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dkubernetes.service-account=flink-service-account \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dkubernetes.container.image=pyflink:latest \ -pyarch venv.zip \ -pyexec venv.zip/venv/bin/python \ --parallelism 4 \ -py ~/Documents/runtime.py \ --jarfile ~/Documents/combined.jar I created the pyflink:latest image by following the documentation here [1] It was unclear to me if had to include my project files in this Docker image. When running it like this, it did submit it to the remote K8s cluster but I got an exception that it could not find my runtime.py file in some sort of tmp folder. Lastly, I wondered if it is possible to set a key for events send to the KafkaProducer. Right now, it seems you can only configure some (static) properties and the serializer. Is there are a workaround to be able to set the key and value of an event in PyFlink? I hope you can help me out with my struggles! Thanks in advance. Regards, Wouter [1] - https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python
Re: PyFlink performance and deployment issues
Hi Dian, all, I will come back to the other points asap. However, I’m still confused about this performance. Is this what I can expect in PyFlink in terms of performance? ~ 1000ms latency for single events? I also had a very simple setup where I send 1000 events to Kafka per second and response times/latencies was around 15 seconds for single events. I understand there is some Python/JVM overhead but since Flink is so performant, I would expect much better numbers. In the current situation, PyFlink would just be unusable if you care about latency. Is this something that you expect to be improved in the future? I will verify how this works out for Beam in a remote environment. Thanks again! Wouter On Thu, 8 Jul 2021 at 08:28, Dian Fu wrote: > Hi Wouter, > > 1) Regarding the performance difference between Beam and PyFlink, I guess > it’s because you are using an in-memory runner when running it locally in > Beam. In that case, the code path is totally differently compared to > running in a remote cluster. > 2) Regarding to `flink run`, I’m surprising that it’s running locally. > Could you submit a java job with similar commands to see how it runs? > 3) Regarding to `flink run-application`, could you share the exception > stack? > > Regards, > Dian > > 2021年7月6日 下午4:58,Wouter Zorgdrager 写道: > > uses > > >
Re: PyFlink performance and deployment issues
Hi Xingbo, all, That is good to know, thank you. Is there any Jira issue I can track? I'm curious to follow this progress! Do you have any recommendations with regard to these two configuration values, to get somewhat reasonable performance? Thanks a lot! Wouter On Thu, 8 Jul 2021 at 10:26, Xingbo Huang wrote: > Hi Wouter, > > In fact, our users have encountered the same problem. Whenever the `bundle > size` or `bundle time` is reached, the data in the buffer needs to be sent > from the jvm to the pvm, and then waits for the pym to be processed and > sent back to the jvm to send all the results to the downstream operator, > which leads to a large delay, especially when it is a small size event as > small messages are hard to be processed in pipeline. > > I have been solving this problem recently and I plan to make this > optimization to release-1.14. > > Best, > Xingbo > > Wouter Zorgdrager 于2021年7月8日周四 下午3:41写道: > >> Hi Dian, all, >> >> I will come back to the other points asap. However, I’m still confused >> about this performance. Is this what I can expect in PyFlink in terms of >> performance? ~ 1000ms latency for single events? I also had a very simple >> setup where I send 1000 events to Kafka per second and response >> times/latencies was around 15 seconds for single events. I understand there >> is some Python/JVM overhead but since Flink is so performant, I would >> expect much better numbers. In the current situation, PyFlink would just be >> unusable if you care about latency. Is this something that you expect to be >> improved in the future? >> >> I will verify how this works out for Beam in a remote environment. >> >> Thanks again! >> Wouter >> >> >> On Thu, 8 Jul 2021 at 08:28, Dian Fu wrote: >> >>> Hi Wouter, >>> >>> 1) Regarding the performance difference between Beam and PyFlink, I >>> guess it’s because you are using an in-memory runner when running it >>> locally in Beam. In that case, the code path is totally differently >>> compared to running in a remote cluster. >>> 2) Regarding to `flink run`, I’m surprising that it’s running locally. >>> Could you submit a java job with similar commands to see how it runs? >>> 3) Regarding to `flink run-application`, could you share the exception >>> stack? >>> >>> Regards, >>> Dian >>> >>> 2021年7月6日 下午4:58,Wouter Zorgdrager 写道: >>> >>> uses >>> >>> >>>
Fwd: PyFlink performance and deployment issues
Hi all, I'm still dealing with the PyFlink deployment issue as described below. I see that I accidentally didn't forward it to the mailing list. Anyways, my job is stuck in `Initializing` and the logs don't really give me a clue what is going on. In my IDE it runs fine. The command I use to submit to the cluster: export PYFLINK_CLIENT_EXECUTABLE=~/Documents/stateflow-evaluation/venv/bin/python ./flink run \ --target remote \ -m localhost:8081 \ -pyarch venv.zip \ --pyExecutable venv.zip/venv/bin/python \ --parallelism 1 \ --python ~/Documents/stateflow-evaluation/pyflink_runtime.py \ --jarfile ~/Documents/stateflow-evaluation/benchmark/bin/combined.jar I hope someone can help me with this because it is a blocker for me. Thanks in advance, Wouter -- Forwarded message ----- From: Wouter Zorgdrager Date: Thu, 8 Jul 2021 at 12:20 Subject: Re: PyFlink performance and deployment issues To: Xingbo Huang HI Xingbo, all, Regarding point 2, I actually made a mistake there. I picked port 8081 (WebUI port) rather than the job submission port (--target remote -m localhost:8081). For some reason, this does not give an error or warning and just starts a local cluster. However, now I got another issue: my job is stuck at initialization. Here an excerpt from the JM log: 2021-07-08 12:12:18,094 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Deploying _stream_key_by_map_operator (1/1) (attempt #0) with attempt id ca9abcc644c05f62a47b83f391c85cd9 to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from SCHEDULED to DEPLOYING. 2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Deploying Process-Stateful-User (1/1) (attempt #0) with attempt id f40fac621cb94c79cdb82146ae5521bb to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from SCHEDULED to DEPLOYING. 2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Deploying Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (attempt #0) with attempt id 58deef879a00052ba6b3447917005c35 to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 2021-07-08 12:12:18,484 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from DEPLOYING to INITIALIZING. 2021-07-08 12:12:18,488 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - _stream_key_by_map_operator (1/1) (ca9abcc644c05f62a47b83f391c85cd9) switched from DEPLOYING to INITIALIZING. 2021-07-08 12:12:18,489 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from DEPLOYING to INITIALIZING. 2021-07-08 12:12:18,490 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Source: Custom Source -> Route-Incoming-Events -> ( Filter-On-User -> Map -> (Filter-Init-User -> Init-User, Filter-Stateful- User), Filter -> Map) (1/1) (c48649bd76abaf77486104e8cfcee7d8) switched from DEPLOYING to INITIALIZING. I run with parallelism 1 and these are the latest loglines from the TM (there is no obvious error): 2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators. AbstractStreamOperator [] - The maximum bundle size is configured to 5. 2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators. AbstractStreamOperator [] - The maximum bundle time is configured to 1 milliseconds. 2021-07-08 12:12:18,791 WARN org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Error while loading kafka-version.properties: inStream parameter is null 2021-07-08 12:12:18,792 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: unknown 2021-07-08 12:12:18,792 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: unknown 2021-07-08 12:12:18,792 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1625739138789 2021-07-08 12:12:18,806 INFO org.apache.flink.streaming.connectors.kafka. FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (1/1) to produce into default topic clie
Can't start FlinkKafkaProducer using SSL
Hi all, I'm trying to deploy a FlinkKafkaProducer in PyFlink on a remote cluster. Unfortunately, I'm getting the following exception: Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common. KafkaException: Failed to construct kafka producer at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer. KafkaProducer.(KafkaProducer.java:432) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer. KafkaProducer.(KafkaProducer.java:298) at org.apache.flink.streaming.connectors.kafka.internals. FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .createProducer(FlinkKafkaProducer.java:1230) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .initProducer(FlinkKafkaProducer.java:1346) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .initNonTransactionalProducer(FlinkKafkaProducer.java:1342) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .beginTransaction(FlinkKafkaProducer.java:990) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .beginTransaction(FlinkKafkaProducer.java:99) at org.apache.flink.streaming.api.functions.sink. TwoPhaseCommitSinkFunction.beginTransactionInternal( TwoPhaseCommitSinkFunction.java:403) at org.apache.flink.streaming.api.functions.sink. TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java: 394) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer .initializeState(FlinkKafkaProducer.java:1195) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils .tryRestoreFunction(StreamingFunctionUtils.java:189) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils .restoreFunctionState(StreamingFunctionUtils.java:171) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator .initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler .initializeOperatorState(StreamOperatorStateHandler.java:118) at org.apache.flink.streaming.api.operators.AbstractStreamOperator .initializeState(AbstractStreamOperator.java:290) at org.apache.flink.streaming.runtime.tasks.OperatorChain .initializeStateAndOpenOperators(OperatorChain.java:436) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates( StreamTask.java:574) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 .call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore( StreamTask.java:554) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common. KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule at org.apache.flink.kafka.shaded.org.apache.kafka.common.network. SaslChannelBuilder.configure(SaslChannelBuilder.java:158) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network. ChannelBuilders.create(ChannelBuilders.java:146) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network. ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientUtils .createChannelBuilder(ClientUtils.java:99) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer. KafkaProducer.newSender(KafkaProducer.java:450) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer. KafkaProducer.(KafkaProducer.java:421) ... 22 more Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794) at javax.security.auth.login.LoginContext.access$000(LoginContext.java: 195) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java: 680) at javax.security.auth.login.LoginContext.login(LoginContext.java:587) at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator. AbstractLogin.login(AbstractLogin.java:60) at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator. LoginManager.(LoginManager.java:62) at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator. LoginManager.acquireLoginManager(LoginManager.java:105) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network. SaslChannelBuilder.configure(SaslChannelBuilder.java:147) ... 27 more My Kafka
Row to tuple conversion in PyFlink when switching to 'thread' execution mode
Dear readers, I'm running into some unexpected behaviour in PyFlink when switching execution mode from process to thread. In thread mode, my `Row` gets converted to a tuple whenever I use a UDF in a map operation. By this conversion to tuples, we lose critical information such as column names. Below is a minimal working example (mostly taken from the documentation): ``` from pyflink.datastream import StreamExecutionEnvironment from pyflink.common import Row from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.udf import udf env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.get_config().set("parallelism.default", "1") # This does work: t_env.get_config().set("python.execution-mode", "process") # This doesn't work: #t_env.get_config().set("python.execution-mode", "thread") def map_function(a: Row) -> Row: return Row(a.a + 1, a.b * a.b) # map operation with a python general scalar function func = udf( map_function, result_type=DataTypes.ROW( [ DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT()), ] ), ) table = ( t_env.from_elements( [(2, 4), (0, 0)], schema=DataTypes.ROW( [ DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT()), ] ), ) .map(func) .alias("a", "b") .execute() .print() ) ``` This results in the following exception: 2024-03-28 16:32:10 Caused by: pemja.core.PythonException: : 'tuple' object has no attribute 'a' 2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72) 2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102) 2024-03-28 16:32:10 at .(:1) 2024-03-28 16:32:10 at /opt/flink/wouter/minimal_example.map_function(minimal_example.py:19) Note that in process mode this works perfectly fine. Is this expected behaviour and/or is there a workaround? Kind regards, Wouter