Statefun 2.0 questions

2020-05-06 Thread Wouter Zorgdrager
Hi all,

I've been using Flink for quite some time now and for a university project
I'm planning to experiment with statefun. During the walkthrough I've run
into some issues, I hope you can help me with.

1) Is it correct that the Docker image of statefun is not yet published? I
couldn't find it anywhere, but was able to run it by building the image
myself.
2) In the example project using the Python SDK, it uses Flask to expose a
function using POST. Is there also a way to serve GET request so that you
can trigger a stateful function by for instance using your browser?
3) Do you expect a lot of performance loss when using the Python SDK over
Java?

Thanks in advance!

Regards,
Wouter


Re: Statefun 2.0 questions

2020-05-07 Thread Wouter Zorgdrager
Hi Igal,

Thanks for your quick reply. Getting back to point 2, I was wondering if
you could trigger indeed a stateful function directly from Flask and also
get the reply there instead of using Kafka in between. We want to
experiment running stateful functions behind a front-end (which should be
able to trigger a function), but we're a bit afraid that using Kafka
doesn't scale well if on the frontend side a user has to consume all Kafka
messages to find the correct reply/output for a certain request/input. Any
thoughts?

Thanks in advance,
Wouter

Op do 7 mei 2020 om 10:51 schreef Igal Shilman :

> Hi Wouter!
>
> Glad to read that you are using Flink for quite some time, and also
> exploring with StateFun!
>
> 1) yes it is correct and you can follow the Dockerhub contribution PR at
> [1]
>
> 2) I’m not sure I understand what do you mean by trigger from the browser.
> If you mean, for testing / illustration purposes triggering the function
> independently of StateFun, you would need to write some JavaScript and
> preform the POST (assuming CORS are enabled)
> Let me know if you’d like getting further information of how to do it.
> Broadly speaking, GET is traditionally used to get data from a resource
> and POST to send data (the data is the invocation batch in our case).
>
> One easier walk around for you would be to expose another endpoint in your
> Flask application, and call your stateful function directly from there
> (possibly populating the function argument with values taken from the query
> params)
>
> 3) I would expect a performance loss when going from the embedded SDK to
> the remote one, simply because the remote function is at a different
> process, and a round trip is required. There are different ways of
> deployment even for remote functions.
> For example they can be co-located with the Task managers and communicate
> via the loop back device /Unix domain socket, or they can be deployed
> behind a load balancer with an auto-scaler, and thus reacting to higher
> request rate/latency increases by spinning new instances (something that is
> not yet supported with the embedded API)
>
> Good luck,
> Igal.
>
>
>
>
>
> [1] https://github.com/docker-library/official-images/pull/7749
>
>
> On Wednesday, May 6, 2020, Wouter Zorgdrager 
> wrote:
>
>> Hi all,
>>
>> I've been using Flink for quite some time now and for a university
>> project I'm planning to experiment with statefun. During the walkthrough
>> I've run into some issues, I hope you can help me with.
>>
>> 1) Is it correct that the Docker image of statefun is not yet published?
>> I couldn't find it anywhere, but was able to run it by building the image
>> myself.
>> 2) In the example project using the Python SDK, it uses Flask to expose a
>> function using POST. Is there also a way to serve GET request so that you
>> can trigger a stateful function by for instance using your browser?
>> 3) Do you expect a lot of performance loss when using the Python SDK over
>> Java?
>>
>> Thanks in advance!
>>
>> Regards,
>> Wouter
>>
>


Re: Statefun 2.0 questions

2020-05-12 Thread Wouter Zorgdrager
Hi Igal, all,

In the meantime we found a way to serve Flink stateful functions in a
frontend. We decided to add another (set of) Flask application(s) which
link to Kafka topics. These Kafka topics then serve as ingress and egress
for the statefun cluster. However, we're wondering how we can scale this
cluster. On the documentation page some nice figures are provided for
different setups but no implementation details are given. In our case we
are using a remote cluster so we have a Docker instance containing the
`python-stateful-function` and of course the Flink cluster containing a
`master` and `worker`. If I understood correctly, in a remote setting, we
can scale both the Flink cluster and the `python-stateful-function`.
Scaling the Flink cluster is trivial because I can add just more
workers/task-managers (providing more taskslots) just by scaling the worker
instance. However, how can I scale the stateful function also ensuring that
it ends op in the correct Flink job (because we need shared state there). I
tried scaling the Docker instance as well but that didn't seem to work.

Hope you can give me some leads there.
Thanks in advance!

Kind regards,
Wouter

Op do 7 mei 2020 om 17:17 schreef Wouter Zorgdrager :

> Hi Igal,
>
> Thanks for your quick reply. Getting back to point 2, I was wondering if
> you could trigger indeed a stateful function directly from Flask and also
> get the reply there instead of using Kafka in between. We want to
> experiment running stateful functions behind a front-end (which should be
> able to trigger a function), but we're a bit afraid that using Kafka
> doesn't scale well if on the frontend side a user has to consume all Kafka
> messages to find the correct reply/output for a certain request/input. Any
> thoughts?
>
> Thanks in advance,
> Wouter
>
> Op do 7 mei 2020 om 10:51 schreef Igal Shilman :
>
>> Hi Wouter!
>>
>> Glad to read that you are using Flink for quite some time, and also
>> exploring with StateFun!
>>
>> 1) yes it is correct and you can follow the Dockerhub contribution PR at
>> [1]
>>
>> 2) I’m not sure I understand what do you mean by trigger from the browser.
>> If you mean, for testing / illustration purposes triggering the function
>> independently of StateFun, you would need to write some JavaScript and
>> preform the POST (assuming CORS are enabled)
>> Let me know if you’d like getting further information of how to do it.
>> Broadly speaking, GET is traditionally used to get data from a resource
>> and POST to send data (the data is the invocation batch in our case).
>>
>> One easier walk around for you would be to expose another endpoint in
>> your Flask application, and call your stateful function directly from there
>> (possibly populating the function argument with values taken from the query
>> params)
>>
>> 3) I would expect a performance loss when going from the embedded SDK to
>> the remote one, simply because the remote function is at a different
>> process, and a round trip is required. There are different ways of
>> deployment even for remote functions.
>> For example they can be co-located with the Task managers and communicate
>> via the loop back device /Unix domain socket, or they can be deployed
>> behind a load balancer with an auto-scaler, and thus reacting to higher
>> request rate/latency increases by spinning new instances (something that is
>> not yet supported with the embedded API)
>>
>> Good luck,
>> Igal.
>>
>>
>>
>>
>>
>> [1] https://github.com/docker-library/official-images/pull/7749
>>
>>
>> On Wednesday, May 6, 2020, Wouter Zorgdrager 
>> wrote:
>>
>>> Hi all,
>>>
>>> I've been using Flink for quite some time now and for a university
>>> project I'm planning to experiment with statefun. During the walkthrough
>>> I've run into some issues, I hope you can help me with.
>>>
>>> 1) Is it correct that the Docker image of statefun is not yet published?
>>> I couldn't find it anywhere, but was able to run it by building the image
>>> myself.
>>> 2) In the example project using the Python SDK, it uses Flask to expose
>>> a function using POST. Is there also a way to serve GET request so that you
>>> can trigger a stateful function by for instance using your browser?
>>> 3) Do you expect a lot of performance loss when using the Python SDK
>>> over Java?
>>>
>>> Thanks in advance!
>>>
>>> Regards,
>>> Wouter
>>>
>>


Re: Statefun 2.0 questions

2020-05-13 Thread Wouter Zorgdrager
Dear Igal, all,

Thanks a lot. This is very helpful. I understand the architecture a bit
more now. We can just scale the stateful functions and put a load balancer
in front and Flink will contact them. The only part of the scaling I don't
understand yet is how to scale the 'Flink side'. So If I understand
correctly the Kafka ingress/egress parts runs on the Flink cluster and
contacts the remote workers through HTTP. How can I scale this Kafka part
then? For a normal Flink job I would just change the parallelism, but I
couldn't really find that option yet. Is there some value I need to set in
the module.yaml.

Once again, thanks for the help so far. It has been useful.

Regards,
Wouter

Op wo 13 mei 2020 om 00:03 schreef Igal Shilman :

> Hi Wouter,
>
> Triggering a stateful function from a frontend indeed requires an ingress
> between them, so the way you've approached this is also the way we were
> thinking of.
> As Gordon mentioned a potential improvement might be an HTTP ingress, that
> would allow triggering stateful functions directly from the front end
> servers.
> But this kind of ingress is not implemented yet.
>
> Regarding scaling: Your understanding is correct, you can scale both the
> Flink cluster and the remote "python-stateful-function" cluster
> independently.
> Scaling the Flink cluster, tho, requires taking a savepoint, bumping the
> job parallelism, and starting the cluster with more workers from the
> savepoint taken previously.
>
> Scaling "python-stateful-function" workers can be done transparently to
> the Flink cluster, but the exact details are deployment specific.
> - For example the python workers are a k8s service.
> - Or the python workers are deployed behind a load balancer
> - Or you add new entries to the DNS record of your python worker.
>
> I didn't understand "ensuring that it ends op in the correct Flink job"
> can you please clarify?
> Flink would be the one contacting the remote workers and not the other way
> around. So as long as the new instances
> are visible to Flink they would be reached with the same shared state.
>
> I'd recommend watching [1] and the demo at the end, and [2] for a demo
> using stateful functions on AWS lambda.
>
> [1] https://youtu.be/NF0hXZfUyqE
> [2] https://www.youtube.com/watch?v=tuSylBadNSo
>
> It seems like you are on the correct path!
> Good luck!
> Igal.
>
>
> On Tue, May 12, 2020 at 11:18 PM Wouter Zorgdrager 
> wrote:
>
>> Hi Igal, all,
>>
>> In the meantime we found a way to serve Flink stateful functions in a
>> frontend. We decided to add another (set of) Flask application(s) which
>> link to Kafka topics. These Kafka topics then serve as ingress and egress
>> for the statefun cluster. However, we're wondering how we can scale this
>> cluster. On the documentation page some nice figures are provided for
>> different setups but no implementation details are given. In our case we
>> are using a remote cluster so we have a Docker instance containing the
>> `python-stateful-function` and of course the Flink cluster containing a
>> `master` and `worker`. If I understood correctly, in a remote setting, we
>> can scale both the Flink cluster and the `python-stateful-function`.
>> Scaling the Flink cluster is trivial because I can add just more
>> workers/task-managers (providing more taskslots) just by scaling the worker
>> instance. However, how can I scale the stateful function also ensuring that
>> it ends op in the correct Flink job (because we need shared state there). I
>> tried scaling the Docker instance as well but that didn't seem to work.
>>
>> Hope you can give me some leads there.
>> Thanks in advance!
>>
>> Kind regards,
>> Wouter
>>
>> Op do 7 mei 2020 om 17:17 schreef Wouter Zorgdrager <
>> zorgdrag...@gmail.com>:
>>
>>> Hi Igal,
>>>
>>> Thanks for your quick reply. Getting back to point 2, I was wondering if
>>> you could trigger indeed a stateful function directly from Flask and also
>>> get the reply there instead of using Kafka in between. We want to
>>> experiment running stateful functions behind a front-end (which should be
>>> able to trigger a function), but we're a bit afraid that using Kafka
>>> doesn't scale well if on the frontend side a user has to consume all Kafka
>>> messages to find the correct reply/output for a certain request/input. Any
>>> thoughts?
>>>
>>> Thanks in advance,
>>> Wouter
>>>
>>> Op do 7 mei 2020 om 10:51 schreef Igal Shilman :
>>>
>>>> Hi Wouter!
&g

Akka version conflict running on Flink cluster

2018-06-11 Thread Wouter Zorgdrager
Hi,

I think I'm running into an Akka version conflict when running a Flink job
on a cluster.

The current situation:
- Flink cluster on Flink 1.4.2 (using Docker)
- Flink job which uses twitter4s [1] library and Akka version 2.5.8

In my Flink job I try to 'shutdown' an Akka actor from the twitter4s
library.
This results in a whole taskmanager crashing with the following stacktrace:

taskrunner_1  | 2018-06-11 09:03:14,454 INFO
org.apache.flink.runtime.taskmanager.TaskManager  -
Un-registering task and sending final execution state CANCELED to
JobManager for task Source: Custom Source -> Sink: Unnamed
(0ba7f7f259eee06fe2f7d783c868179b)
taskrunner_1  | Uncaught error from thread
[twitter4s-streaming-akka.actor.default-dispatcher-288]: loader constraint
violation: when resolving method
"akka.actor.ActorCell$$anonfun$3.(Lakka/actor/ActorCell;)V" the class
loader (instance of
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
of the current class, akka/actor/ActorCell, and the class loader (instance
of sun/misc/Launcher$AppClassLoader) for the method's defining class,
akka/actor/ActorCell$$anonfun$3, have different Class objects for the type
akka/actor/ActorCell used in the signature, shutting down JVM since
'akka.jvm-exit-on-fatal-error' is enabled for for
ActorSystem[twitter4s-streaming]
taskrunner_1  | java.lang.LinkageError: loader constraint violation: when
resolving method
"akka.actor.ActorCell$$anonfun$3.(Lakka/actor/ActorCell;)V" the class
loader (instance of
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
of the current class, akka/actor/ActorCell, and the class loader (instance
of sun/misc/Launcher$AppClassLoader) for the method's defining class,
akka/actor/ActorCell$$anonfun$3, have different Class objects for the type
akka/actor/ActorCell used in the signature
taskrunner_1  | at akka.actor.ActorCell.invoke(ActorCell.scala:499)
taskrunner_1  | at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
taskrunner_1  | at akka.dispatch.Mailbox.run(Mailbox.scala:224)
taskrunner_1  | at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
taskrunner_1  | at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
taskrunner_1  | at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
taskrunner_1  | at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
taskrunner_1  | at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
taskrunner_1  | 2018-06-11 09:03:14,984 INFO
org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting
down BLOB cache
taskrunner_1  | 2018-06-11 09:03:14,985 INFO
org.apache.flink.runtime.blob.TransientBlobCache  - Shutting
down BLOB cache
taskrunner_1  | Exception in thread "twitter4s-streaming-shutdown-hook-1"
java.lang.NoClassDefFoundError:
akka/actor/CoordinatedShutdown$$anonfun$totalTimeout$1
taskrunner_1  | at
akka.actor.CoordinatedShutdown.totalTimeout(CoordinatedShutdown.scala:515)
taskrunner_1  | at
akka.actor.CoordinatedShutdown$$anonfun$initJvmHook$1.apply(CoordinatedShutdown.scala:217)
taskrunner_1  | at
akka.actor.CoordinatedShutdown$$anon$2.run(CoordinatedShutdown.scala:547)
taskrunner_1  | Caused by: java.lang.ClassNotFoundException:
akka.actor.CoordinatedShutdown$$anonfun$totalTimeout$1
taskrunner_1  | at
java.net.URLClassLoader.findClass(URLClassLoader.java:381)
taskrunner_1  | at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
taskrunner_1  | at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
taskrunner_1  | at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
taskrunner_1  | ... 3 more

To me, it looks like an version conflict. Any suggestions how to solve this?

Thanks!
Wouter

[1] - Twitter4s:
https://github.com/DanielaSfregola/twitter4s/blob/master/build.sbt


Avro serialization and deserialization to Kafka in Scala

2019-02-07 Thread Wouter Zorgdrager
Hello all,


I saw the recent updates in Flink related to supporting Avro schema evolution 
in state. I'm curious how Flink handles this internally for Scala case classes. 
I'm working on custom (de-)serialization schema's to write and read from Kafka. 
However, I'm currently stuck because of the fact that Avro doesn't natively 
support Scala. This means that in order to support case class serialization 
using Scala specific types (like Option, Either, etc.) I need a library like 
Avro4s [1] or AvroHugger [2] which on compile-time generates schemas using 
macros. These macro-extensions are extremely slow for complex case classes 
(compile-time of 15 minutes for a few nested types). I'm looking for an 
approach without the use of these libraries and therefore curious how Flink 
handles this.


Does anyone has some good leads for this?


Thanks in advance!


Kind regards,
Wouter Zorgdrager


[1] https://github.com/sksamuel/avro4s
[2] https://github.com/julianpeeters/avrohugger



Using Flink in an university course

2019-03-04 Thread Wouter Zorgdrager
Hi all,

I'm working on a setup to use Apache Flink in an assignment for a Big Data
(bachelor) university course and I'm interested in your view on this. To
sketch the situation:
-  > 200 students follow this course
- students have to write some (simple) Flink applications using the
DataStream API; the focus is on writing the transformation code
- students need to write Scala code
- we provide a dataset and a template (Scala class) with function
signatures and detailed description per application.
e.g.: def assignment_one(input: DataStream[Event]): DataStream[(String,
Int)] = ???
- we provide some setup code like parsing of data and setting up the
streaming environment
- assignments need to be auto-graded, based on correct results

In last years course edition we approached this by a custom Docker
container. This container first compiled the students code, run all the
Flink applications against a different dataset and then verified the output
against our solutions. This was turned into a grade and reported back to
the student. Although this was a working approach, I think we can do better.

I'm wondering if any of you have experience with using Apache Flink in a
university course (or have seen this somewhere) as well as assessing Flink
code.

Thanks a lot!

Kind regards,
Wouter Zorgdrager


Re: Using Flink in an university course

2019-03-04 Thread Wouter Zorgdrager
Hey all,

Thanks for the replies. The issues we were running into (which are not
specific to Docker):
- Students changing the template wrongly failed the container.
- We give full points if the output matches our solutions (and none
otherwise), but it would be nice if we could give partial grades per
assignment (and better feedback). This would require instead of looking
only at results also at the operators used. The pitfall is that in many
cases a correct solution can be achieved in multiple ways. I came across a
Flink test library [1] which allows to test Flink code more extensively but
seems to be only in Java.

In retrospective, I do think using Docker is a good approach as Fabian
confirms. However, the way we currently assess student solutions might be
improved. I assume that in your trainings manual feedback is given, but
unfortunately this is quite difficult for so many students.

Cheers,
Wouter

1: https://github.com/ottogroup/flink-spector


Op ma 4 mrt. 2019 om 14:39 schreef Fabian Hueske :

> Hi Wouter,
>
> We are using Docker Compose (Flink JM, Flink TM, Kafka, Zookeeper) setups
> for our trainings and it is working very well.
> We have an additional container that feeds a Kafka topic via the
> commandline producer to simulate a somewhat realistic behavior.
> Of course, you can do it without Kafka as and use some kind of data
> generating source that reads from a file that is replace for evaluation.
>
> The biggest benefit that I see with using Docker is that the students have
> an environment that is close to grading situation for development and
> testing.
> You do not need to provide infrastructure but everyone is running it
> locally in a well-defined context.
>
> So, as Joern said, what problems do you see with Docker?
>
> Best,
> Fabian
>
> Am Mo., 4. März 2019 um 13:44 Uhr schrieb Jörn Franke <
> jornfra...@gmail.com>:
>
>> It would help to understand the current issues that you have with this
>> approach? I used a similar approach (not with Flink, but a similar big data
>> technology) some years ago
>>
>> > Am 04.03.2019 um 11:32 schrieb Wouter Zorgdrager <
>> w.d.zorgdra...@tudelft.nl>:
>> >
>> > Hi all,
>> >
>> > I'm working on a setup to use Apache Flink in an assignment for a Big
>> Data (bachelor) university course and I'm interested in your view on this.
>> To sketch the situation:
>> > -  > 200 students follow this course
>> > - students have to write some (simple) Flink applications using the
>> DataStream API; the focus is on writing the transformation code
>> > - students need to write Scala code
>> > - we provide a dataset and a template (Scala class) with function
>> signatures and detailed description per application.
>> > e.g.: def assignment_one(input: DataStream[Event]): DataStream[(String,
>> Int)] = ???
>> > - we provide some setup code like parsing of data and setting up the
>> streaming environment
>> > - assignments need to be auto-graded, based on correct results
>> >
>> > In last years course edition we approached this by a custom Docker
>> container. This container first compiled the students code, run all the
>> Flink applications against a different dataset and then verified the output
>> against our solutions. This was turned into a grade and reported back to
>> the student. Although this was a working approach, I think we can do better.
>> >
>> > I'm wondering if any of you have experience with using Apache Flink in
>> a university course (or have seen this somewhere) as well as assessing
>> Flink code.
>> >
>> > Thanks a lot!
>> >
>> > Kind regards,
>> > Wouter Zorgdrager
>>
>


Re: Using Flink in an university course

2019-03-06 Thread Wouter Zorgdrager
Hi all,

Thanks for the input. Much appreciated.

Regards,
Wouter

Op ma 4 mrt. 2019 om 20:40 schreef Addison Higham :

> Hi there,
>
> As far as a runtime for students, it seems like docker is your best bet.
> However, you could have them instead package a jar using some interface
> (for example, see
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/packaging.html,
> which details the `Program` interface) and then execute it inside a custom
> runner. That *might* result in something less prone to breakage as it would
> need to conform to an interface, but it may require a fair amount of custom
> code to reduce the boiler plate to build up a program plan as well as the
> custom runner. The code for how flink loads a jar and turns it into
> something it can execute is mostly encapsulated
> in org.apache.flink.client.program.PackagedProgram, which might be a good
> thing to read and understand if you go down this route.
>
> If you want to give more insight, you could build some tooling to traverse
> the underlying graphs that the students build up in their data stream
> application. For example, calling
> `StreamExecutionEnvironment.getStreamGraph` after the data stream is built
> will get a graph of the current job, which you can then use to traverse a
> graph and see which operators and edges are in use. This is very similar to
> the process flink uses to build the job DAG it renders in the UI. I am not
> sure what you could do as an automated analysis, but the StreamGraph API is
> quite low level and exposes a lot of information about the program.
>
> Hopefully that is a little bit helpful. Good luck and sounds like a fun
> course!
>
>
> On Mon, Mar 4, 2019 at 7:16 AM Wouter Zorgdrager <
> w.d.zorgdra...@tudelft.nl> wrote:
>
>> Hey all,
>>
>> Thanks for the replies. The issues we were running into (which are not
>> specific to Docker):
>> - Students changing the template wrongly failed the container.
>> - We give full points if the output matches our solutions (and none
>> otherwise), but it would be nice if we could give partial grades per
>> assignment (and better feedback). This would require instead of looking
>> only at results also at the operators used. The pitfall is that in many
>> cases a correct solution can be achieved in multiple ways. I came across a
>> Flink test library [1] which allows to test Flink code more extensively but
>> seems to be only in Java.
>>
>> In retrospective, I do think using Docker is a good approach as Fabian
>> confirms. However, the way we currently assess student solutions might be
>> improved. I assume that in your trainings manual feedback is given, but
>> unfortunately this is quite difficult for so many students.
>>
>> Cheers,
>> Wouter
>>
>> 1: https://github.com/ottogroup/flink-spector
>>
>>
>> Op ma 4 mrt. 2019 om 14:39 schreef Fabian Hueske :
>>
>>> Hi Wouter,
>>>
>>> We are using Docker Compose (Flink JM, Flink TM, Kafka, Zookeeper)
>>> setups for our trainings and it is working very well.
>>> We have an additional container that feeds a Kafka topic via the
>>> commandline producer to simulate a somewhat realistic behavior.
>>> Of course, you can do it without Kafka as and use some kind of data
>>> generating source that reads from a file that is replace for evaluation.
>>>
>>> The biggest benefit that I see with using Docker is that the students
>>> have an environment that is close to grading situation for development and
>>> testing.
>>> You do not need to provide infrastructure but everyone is running it
>>> locally in a well-defined context.
>>>
>>> So, as Joern said, what problems do you see with Docker?
>>>
>>> Best,
>>> Fabian
>>>
>>> Am Mo., 4. März 2019 um 13:44 Uhr schrieb Jörn Franke <
>>> jornfra...@gmail.com>:
>>>
>>>> It would help to understand the current issues that you have with this
>>>> approach? I used a similar approach (not with Flink, but a similar big data
>>>> technology) some years ago
>>>>
>>>> > Am 04.03.2019 um 11:32 schrieb Wouter Zorgdrager <
>>>> w.d.zorgdra...@tudelft.nl>:
>>>> >
>>>> > Hi all,
>>>> >
>>>> > I'm working on a setup to use Apache Flink in an assignment for a Big
>>>> Data (bachelor) university course and I'm interested in your view on this.
>>>> To sketch the situation:
>>>> > -  > 200 students follow this course
>>>> > - 

Challenges using Flink REST API

2019-03-13 Thread Wouter Zorgdrager
 an option, because the (main) idea behind
this framework is to reduce the boilerplate and cumbersome of setting up
complex stream processing architectures.

Any help is appreciated. Thanks in advance!

Kind regards,
Wouter Zorgdrager


Re: Challenges using Flink REST API

2019-03-13 Thread Wouter Zorgdrager
Hi Chesnay,

Unfortunately this is not true when I run the Flink 1.7.2 docker images.
The response is still:
{
"errors": [
"org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error."
]
}

Regards,
Wouter Zorgdrager

Op wo 13 mrt. 2019 om 10:42 schreef Chesnay Schepler :

> You should get the full stacktrace if you upgrade to 1.7.2 .
>
>
> On 13.03.2019 09:55, Wouter Zorgdrager wrote:
>
> Hey all!
>
> I'm looking for some advice on the following; I'm working on an
> abstraction on top of Apache Flink to 'pipeline' Flink applications using
> Kafka. For deployment this means that all these Flink jobs are embedded
> into one jar and each job is started using an program argument (e.g.
> "--stage 'FirstFlinkJob'". To ease deploying a set of interconnected Flink
> jobs onto a cluster I wrote a Python script which basically communicates
> with the REST client of the JobManager. So you can do things like "pipeline
> start --jar 'JarWithThePipeline.jar'" and this would deploy every Flink
> application separately.
>
> However, this script was written a while ago against Flink version
> "1.4.2". This week I tried to upgrade it to Flink latest version but I
> noticed a change in the REST responses. In order to get the "pipeline
> start" command working,we need to know all the Flink jobs that are in the
> jar (we call these Flink jobs 'stages') because we need to know the stage
> names as argument for the jar. For the 1.4.2 version we used a dirty trick;
> we ran the jar with '--list --asException' as program arguments which
> basically runs the jar file and immediately throws an exception with the
> stage names. These are then parsed and used to start every stage
> separately. The error message that Flink threw looked something like this:
>
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: Could not run the jar.
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
> ... 9 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> main method caused an error.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
> at
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> at
> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
> ... 8 more
> Caused by: org.codefeedr.pipeline.PipelineListException:
> ["org.codefeedr.plugin.twitter.stages.TwitterStatusInput","mongo_tweets","elasticsearch_tweets"]
> at org.codefeedr.pipeline.Pipeline.showList(Pipeline.scala:114)
> at org.codefeedr.pipeline.Pipeline.start(Pipeline.scala:100)
> at nl.wouterr.Main$.main(Main.scala:23)
> at nl.wouterr.Main.main(Main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>
> However, for 1.7.0 this trick doesn't work anymore because instead of
> returning the full stack trace, it only returns the following:
> org.apache.flink.client.program.ProgramInvocationException: The program
> caused an error:
>
> In the c

Re: Challenges using Flink REST API

2019-03-13 Thread Wouter Zorgdrager
Hey Chesnay,

Actually I was mistaken by stating that in the JobManager logs I got the
full stacktrace because I actually got the following there:
2019-03-13 11:55:13,906 ERROR
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Exception
occurred in REST handler:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.

By some googling I came across this Jira issue [1], which seems to fix my
issue in 1.8.0. However, I was still confused why this ever worked for me
in 1.4.2 and by checking some binaries I found out that the REST API was
reworked for 1.5.0 [2] which removed the full stack trace.

Is there any (official) Docker image to already run Flink 1.8?

Thanks,
Wouter

[1]: https://jira.apache.org/jira/browse/FLINK-11423
[2]: *https://jira.apache.org/jira/browse/FLINK-7715
<https://jira.apache.org/jira/browse/FLINK-7715>*


Op wo 13 mrt. 2019 om 12:18 schreef Chesnay Schepler :

> Can you give me the stacktrace that is logged in the JobManager logs?
>
>
> On 13.03.2019 10:57, Wouter Zorgdrager wrote:
>
> Hi Chesnay,
>
> Unfortunately this is not true when I run the Flink 1.7.2 docker images.
> The response is still:
> {
> "errors": [
> "org.apache.flink.client.program.ProgramInvocationException: The
> main method caused an error."
> ]
> }
>
> Regards,
> Wouter Zorgdrager
>
> Op wo 13 mrt. 2019 om 10:42 schreef Chesnay Schepler :
>
>> You should get the full stacktrace if you upgrade to 1.7.2 .
>>
>>
>> On 13.03.2019 09:55, Wouter Zorgdrager wrote:
>>
>> Hey all!
>>
>> I'm looking for some advice on the following; I'm working on an
>> abstraction on top of Apache Flink to 'pipeline' Flink applications using
>> Kafka. For deployment this means that all these Flink jobs are embedded
>> into one jar and each job is started using an program argument (e.g.
>> "--stage 'FirstFlinkJob'". To ease deploying a set of interconnected Flink
>> jobs onto a cluster I wrote a Python script which basically communicates
>> with the REST client of the JobManager. So you can do things like "pipeline
>> start --jar 'JarWithThePipeline.jar'" and this would deploy every Flink
>> application separately.
>>
>> However, this script was written a while ago against Flink version
>> "1.4.2". This week I tried to upgrade it to Flink latest version but I
>> noticed a change in the REST responses. In order to get the "pipeline
>> start" command working,we need to know all the Flink jobs that are in the
>> jar (we call these Flink jobs 'stages') because we need to know the stage
>> names as argument for the jar. For the 1.4.2 version we used a dirty trick;
>> we ran the jar with '--list --asException' as program arguments which
>> basically runs the jar file and immediately throws an exception with the
>> stage names. These are then parsed and used to start every stage
>> separately. The error message that Flink threw looked something like this:
>>
>> java.util.concurrent.CompletionException:
>> org.apache.flink.util.FlinkException: Could not run the jar.
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
>> ... 9 more
>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>> The main method caused an error.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>> at
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>> at
>> org.apache.flink.client.program.ClusterClie

Case class field limit

2019-03-22 Thread Wouter Zorgdrager
Hey all,

Since Scala 2.11 the amount of fields in a case class isn't restricted to
22 anymore [1]. I was wondering if Flink still uses this limit internally,
if I check the documentation [2] I also see a max of 22 fields. However, I
just ran a simple test setup with a case class > 22 fields and this worked
fine. Is the documentation outdated or am I missing something?

Hope someone can clarify!

Cheers,
Wouter


[1] - https://github.com/scala/bug/issues/7296
[2] -
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/types_serialization.html#flinks-typeinformation-class


Re: Case class field limit

2019-03-22 Thread Wouter Zorgdrager
Done! https://issues.apache.org/jira/browse/FLINK-11996

Op vr 22 mrt. 2019 om 11:52 schreef Chesnay Schepler :

> It is likely that the documentation is outdated. Could open a JIRA for
> updating the documentation?
>
> On 22/03/2019 10:12, Wouter Zorgdrager wrote:
> > Hey all,
> >
> > Since Scala 2.11 the amount of fields in a case class isn't restricted
> > to 22 anymore [1]. I was wondering if Flink still uses this limit
> > internally, if I check the documentation [2] I also see a max of 22
> > fields. However, I just ran a simple test setup with a case class > 22
> > fields and this worked fine. Is the documentation outdated or am I
> > missing something?
> >
> > Hope someone can clarify!
> >
> > Cheers,
> > Wouter
> >
> >
> > [1] - https://github.com/scala/bug/issues/7296
> > [2] -
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/types_serialization.html#flinks-typeinformation-class
> >
>
>
>


Re: Read mongo datasource in Flink

2019-04-29 Thread Wouter Zorgdrager
For a framework I'm working on, we actually implemented a (basic) Mongo
source [1]. It's written in Scala and uses Json4s [2] to parse the data
into a case class. It uses a Mongo observer to iterate over a collection
and emit it into a Flink context.

Cheers,
Wouter

[1]:
https://github.com/codefeedr/codefeedr/blob/develop/codefeedr-plugins/codefeedr-mongodb/src/main/scala/org/codefeedr/plugins/mongodb/BaseMongoSource.scala

[2]: http://json4s.org/

Op ma 29 apr. 2019 om 13:57 schreef Flavio Pompermaier :

> I'm not aware of an official source/sink..if you want you could try to
> exploit the Mongo HadoopInputFormat as in [1].
> The provided link use a pretty old version of Flink but it should not be a
> big problem to update the maven dependencies and the code to a newer
> version.
>
> Best,
> Flavio
>
> [1] https://github.com/okkam-it/flink-mongodb-test
>
> On Mon, Apr 29, 2019 at 6:15 AM Hai  wrote:
>
>> Hi,
>>
>>
>> Can anyone give me a clue about how to read mongodb’s data as a
>> batch/streaming datasource in Flink? I don’t find the mongodb connector in
>> recent release version .
>>
>>
>> Many thanks
>>
>
>


Re: Read mongo datasource in Flink

2019-04-29 Thread Wouter Zorgdrager
Yes, that is correct. This is a really basic implementation that doesn't
take parallelism into account. I think you need something like this [1] to
get that working.

[1]:
https://docs.mongodb.com/manual/reference/command/parallelCollectionScan/#dbcmd.parallelCollectionScan

Op ma 29 apr. 2019 om 14:37 schreef Flavio Pompermaier :

> But what about parallelism with this implementation? From what I see
> there's only a single thread querying Mongo and fetching all the data..am I
> wrong?
>
> On Mon, Apr 29, 2019 at 2:05 PM Wouter Zorgdrager <
> w.d.zorgdra...@tudelft.nl> wrote:
>
>> For a framework I'm working on, we actually implemented a (basic) Mongo
>> source [1]. It's written in Scala and uses Json4s [2] to parse the data
>> into a case class. It uses a Mongo observer to iterate over a collection
>> and emit it into a Flink context.
>>
>> Cheers,
>> Wouter
>>
>> [1]:
>> https://github.com/codefeedr/codefeedr/blob/develop/codefeedr-plugins/codefeedr-mongodb/src/main/scala/org/codefeedr/plugins/mongodb/BaseMongoSource.scala
>>
>> [2]: http://json4s.org/
>>
>> Op ma 29 apr. 2019 om 13:57 schreef Flavio Pompermaier <
>> pomperma...@okkam.it>:
>>
>>> I'm not aware of an official source/sink..if you want you could try to
>>> exploit the Mongo HadoopInputFormat as in [1].
>>> The provided link use a pretty old version of Flink but it should not be
>>> a big problem to update the maven dependencies and the code to a newer
>>> version.
>>>
>>> Best,
>>> Flavio
>>>
>>> [1] https://github.com/okkam-it/flink-mongodb-test
>>>
>>> On Mon, Apr 29, 2019 at 6:15 AM Hai  wrote:
>>>
>>>> Hi,
>>>>
>>>>
>>>> Can anyone give me a clue about how to read mongodb’s data as a
>>>> batch/streaming datasource in Flink? I don’t find the mongodb connector in
>>>> recent release version .
>>>>
>>>>
>>>> Many thanks
>>>>
>>>
>>>
>


Preserve accumulators after failure in DataStream API

2019-04-30 Thread Wouter Zorgdrager
Hi all,

In the documentation I read about UDF accumulators [1] "Accumulators are
automatically backup-ed by Flink’s checkpointing mechanism and restored in
case of a failure to ensure exactly-once semantics." So I assumed this also
was the case of accumulators used in the DataStream API, but I noticed that
it isn't. So every time my jobs crashes and restarts, the accumulator is
reset. Is there a way to retain this information?

Thanks,
Wouter


[1].
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html


Re: Preserve accumulators after failure in DataStream API

2019-05-02 Thread Wouter Zorgdrager
Hi Fabian,

Maybe I should clarify a bit, actually I'm using a (Long)Counter registered
as Accumulator in the RuntimeContext [1]. So I'm using a
KeyedProcessFunction, not an AggregateFunction. This works property, but is
not retained after a job restart. I'm not entirely sure if I did this
correct.

Thx,
Wouter


[1].
https://github.com/codefeedr/ghtorrent_mirror/blob/01e5cde837342993c7d287c60e40762e98f8d010/src/main/scala/org/codefeedr/experimental/stats/CommitsStatsProcess.scala#L34



Op do 2 mei 2019 om 09:36 schreef Fabian Hueske :

> Hi Wouter,
>
> The DataStream API accumulators of the AggregateFunction [1] are stored in
> state and should be recovered in case of a failure as well.
> If this does not work, it would be a serious bug.
>
> What's the type of your accumulator?
> Can you maybe share the code?
> How to you apply the AggregateFunction (window, windowAll, ...)?
>
> Thanks,
> Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>
> Am Di., 30. Apr. 2019 um 13:19 Uhr schrieb Wouter Zorgdrager <
> w.d.zorgdra...@tudelft.nl>:
>
>> Hi all,
>>
>> In the documentation I read about UDF accumulators [1] "Accumulators are
>> automatically backup-ed by Flink’s checkpointing mechanism and restored in
>> case of a failure to ensure exactly-once semantics." So I assumed this
>> also was the case of accumulators used in the DataStream API, but I noticed
>> that it isn't. So every time my jobs crashes and restarts, the accumulator
>> is reset. Is there a way to retain this information?
>>
>> Thanks,
>> Wouter
>>
>>
>> [1].
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html
>>
>


Re: Preserve accumulators after failure in DataStream API

2019-05-02 Thread Wouter Zorgdrager
+1, especially if you don't want to rely on external metric reporter this
is a nice feature.

Op do 2 mei 2019 om 10:29 schreef Fabian Hueske :

> Hi,
>
> Both of you seem to have the same requirement.
> This is a good indication that "fault-tolerant metrics" are a missing
> feature.
> It might make sense to think about a built-in mechanism to back metrics
> with state.
>
> Cheers,
> Fabian
>
>
>
> Am Do., 2. Mai 2019 um 10:25 Uhr schrieb Paul Lam :
>
>> Hi Wouter,
>>
>> I've met the same issue and finally managed to use operator states to
>> back the accumulators, so they can be restored after restarts.
>> The downside is that we have to update the values in both accumulators
>> and states to make them consistent. FYI.
>>
>> Best,
>> Paul Lam
>>
>> Fabian Hueske  于2019年5月2日周四 下午4:17写道:
>>
>>> Hi Wouter,
>>>
>>> OK, that explains it :-) Overloaded terms...
>>>
>>> The Table API / SQL documentation refers to the accumulator of an
>>> AggregateFunction [1].
>>> The accumulators that are accessible via the RuntimeContext are a rather
>>> old part of the API that is mainly intended for batch jobs.
>>>
>>> I would not use them for streaming applications as they are not
>>> checkpointed and recovered (as you noticed).
>>> You should use managed state (keyed or operator) for such use cases.
>>>
>>> Best,
>>> Fabian
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AggregateFunction.java
>>>
>>> Am Do., 2. Mai 2019 um 10:01 Uhr schrieb Wouter Zorgdrager <
>>> w.d.zorgdra...@tudelft.nl>:
>>>
>>>> Hi Fabian,
>>>>
>>>> Maybe I should clarify a bit, actually I'm using a (Long)Counter
>>>> registered as Accumulator in the RuntimeContext [1]. So I'm using a
>>>> KeyedProcessFunction, not an AggregateFunction. This works property, but is
>>>> not retained after a job restart. I'm not entirely sure if I did this
>>>> correct.
>>>>
>>>> Thx,
>>>> Wouter
>>>>
>>>>
>>>> [1].
>>>> https://github.com/codefeedr/ghtorrent_mirror/blob/01e5cde837342993c7d287c60e40762e98f8d010/src/main/scala/org/codefeedr/experimental/stats/CommitsStatsProcess.scala#L34
>>>>
>>>>
>>>>
>>>> Op do 2 mei 2019 om 09:36 schreef Fabian Hueske :
>>>>
>>>>> Hi Wouter,
>>>>>
>>>>> The DataStream API accumulators of the AggregateFunction [1] are
>>>>> stored in state and should be recovered in case of a failure as well.
>>>>> If this does not work, it would be a serious bug.
>>>>>
>>>>> What's the type of your accumulator?
>>>>> Can you maybe share the code?
>>>>> How to you apply the AggregateFunction (window, windowAll, ...)?
>>>>>
>>>>> Thanks,
>>>>> Fabian
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>>>>>
>>>>> Am Di., 30. Apr. 2019 um 13:19 Uhr schrieb Wouter Zorgdrager <
>>>>> w.d.zorgdra...@tudelft.nl>:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> In the documentation I read about UDF accumulators [1] "Accumulators
>>>>>> are automatically backup-ed by Flink’s checkpointing mechanism and 
>>>>>> restored
>>>>>> in case of a failure to ensure exactly-once semantics." So I assumed
>>>>>> this also was the case of accumulators used in the DataStream API, but I
>>>>>> noticed that it isn't. So every time my jobs crashes and restarts, the
>>>>>> accumulator is reset. Is there a way to retain this information?
>>>>>>
>>>>>> Thanks,
>>>>>> Wouter
>>>>>>
>>>>>>
>>>>>> [1].
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html
>>>>>>
>>>>>


Flink and Prometheus setup in K8s

2019-05-13 Thread Wouter Zorgdrager
Hey all,

I'm working on a deployment setup with Flink and Prometheus on Kubernetes.
I'm running into the following issues:

1) Is it possible to use the default Flink Docker image [1] and enable the
Prometheus reporter? Modifying the flink-config.yaml is easy, but somehow
the Prometheus reporter jar needs to be moved within the image. This is
easy if use my own Dockerfile (as done here [2]) , but I prefer using the
official one.
2) I can define the jobmanager/taskmanager metric endpoints statically, but
w.r.t. scaling I prefer to have these resolved/discovered dynamically. Did
anyone get a working setup on this? I came across this resource for YARN
[3], is there something similar for Kubernetes? Or are there any other ways
of configuring Prometheus to pick this up automatically?

Thanks a lot for your help!

Kind regards,
Wouter

[1]: https://hub.docker.com/_/flink/
[2]:
https://github.com/mbode/flink-prometheus-example/blob/master/Dockerfile
[3]: https://github.com/eastcirclek/flink-service-discovery


Re: Flink and Prometheus setup in K8s

2019-05-15 Thread Wouter Zorgdrager
Hi all,

To answer my own questions I worked on the following solution:

1) Custom Docker image which pulls the Flink image and moves Prometheus jar
to the correct folder [1, 2].
2) I wrote manifests for Kubernetes with service discovery configuration
for Kubernetes [3]. Besides the 'official' Flink Kubernetes manifests, I
added a TM service which exposes all TM metrics locations so that
Prometheus can scrape it. This means that (re)-scaling Flink TM's are
automatically picked up by Prometheus. The repository also includes a
Grafana setup with a simple dashboard.

I thought this might be useful for other users!

Cheers,
Wouter

[1]: https://github.com/wzorgdrager/flink-k8s/blob/master/docker/Dockerfile
[2]: https://hub.docker.com/r/wzorgdrager/flink-prometheus
[3]: https://github.com/wzorgdrager/flink-k8s

Op ma 13 mei 2019 om 14:16 schreef Wouter Zorgdrager <
w.d.zorgdra...@tudelft.nl>:

> Hey all,
>
> I'm working on a deployment setup with Flink and Prometheus on Kubernetes.
> I'm running into the following issues:
>
> 1) Is it possible to use the default Flink Docker image [1] and enable the
> Prometheus reporter? Modifying the flink-config.yaml is easy, but somehow
> the Prometheus reporter jar needs to be moved within the image. This is
> easy if use my own Dockerfile (as done here [2]) , but I prefer using the
> official one.
> 2) I can define the jobmanager/taskmanager metric endpoints statically,
> but w.r.t. scaling I prefer to have these resolved/discovered dynamically.
> Did anyone get a working setup on this? I came across this resource for
> YARN [3], is there something similar for Kubernetes? Or are there any other
> ways of configuring Prometheus to pick this up automatically?
>
> Thanks a lot for your help!
>
> Kind regards,
> Wouter
>
> [1]: https://hub.docker.com/_/flink/
> [2]:
> https://github.com/mbode/flink-prometheus-example/blob/master/Dockerfile
> [3]: https://github.com/eastcirclek/flink-service-discovery
>


Re: Flink not giving full reason as to why job submission failed

2019-05-16 Thread Wouter Zorgdrager
Hi Harshith,

This was indeed an issue in 1.7.2, but fixed in 1.8.0. See the
corresponding Jira issue [1].

Cheers,
Wouter

[1]: https://issues.apache.org/jira/browse/FLINK-11902

Op do 16 mei 2019 om 16:05 schreef Kumar Bolar, Harshith :

> Hi all,
>
>
>
> After upgrading Flink to 1.7.2, when I try to submit a job from the
> dashboard and there's some issue with the job, the job submission fails
> with the following error.
>
>
>
> Exception occurred in REST handler:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
>
> There's no other reason given as to why the job failed to submit. This was
> not the case in 1.4.2. Is there a way to see the full reason why the job
> failed to deploy? I see the same error in the logs too with no additional
> information.
>
> Thanks,
>
> Harshith
>


Re: Re: Flink not giving full reason as to why job submission failed

2019-05-20 Thread Wouter Zorgdrager
Hi Harshith,

This is indeed an issue not resolved in 1.8. I added a comment to the
(closed) Jira issue, so this might be fixed in further releases.

Cheers,
Wouter

Op ma 20 mei 2019 om 16:18 schreef Kumar Bolar, Harshith :

> Hi Wouter,
>
>
>
> I’ve upgraded Flink to 1.8, but now I only see Internal server error on
> the dashboard when a job deployment fails.
>
>
>
>
>
> But in the logs I see the correct exception -
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
>
>   at
> functions.PersistPIDMessagesToCassandra.main(PersistPIDMessagesToCassandra.java:59)
>
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>   at java.lang.reflect.Method.invoke(Method.java:498)
>
>   at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>
>   ... 9 more
>
>
>
> Is there any way to show these errors on the dashboard? Because many
> application teams deploy jobs through the dashboard and don’t have ready
> access to the logs.
>
>
>
> Thanks,
>
> Harshith
>
>
>
> *From: *Wouter Zorgdrager 
> *Date: *Thursday, 16 May 2019 at 7:56 PM
> *To: *Harshith Kumar Bolar 
> *Cc: *user 
> *Subject: *[External] Re: Flink not giving full reason as to why job
> submission failed
>
>
>
> Hi Harshith,
>
>
>
> This was indeed an issue in 1.7.2, but fixed in 1.8.0. See the
> corresponding Jira issue [1].
>
>
>
> Cheers,
>
> Wouter
>
>
>
> [1]: https://issues.apache.org/jira/browse/FLINK-11902
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D11902&d=DwMFaQ&c=gtIjdLs6LnStUpy9cTOW9w&r=61bFb6zUNKZxlAQDRo_jKA&m=mnxsj0sRDEDo66PWXKz0vcnqyR6N6FFcRT1a8fQQ_tQ&s=ExTlIDAtlDhFXbfvmWEX7sSHnlu6sLz3SdyB9-1Hv60&e=>
>
>
>
>
> Op do 16 mei 2019 om 16:05 schreef Kumar Bolar, Harshith  >:
>
> Hi all,
>
>
>
> After upgrading Flink to 1.7.2, when I try to submit a job from the
> dashboard and there's some issue with the job, the job submission fails
> with the following error.
>
>
>
> Exception occurred in REST handler:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
>
> There's no other reason given as to why the job failed to submit. This was
> not the case in 1.4.2. Is there a way to see the full reason why the job
> failed to deploy? I see the same error in the logs too with no additional
> information.
>
> Thanks,
>
> Harshith
>
>


Unexpected behavior from interval join in Flink

2019-06-17 Thread Wouter Zorgdrager
Hi all,

I'm experiencing some unexpected behavior using an interval join in Flink.
I'm dealing with two data sets, lets call them X and Y. They are finite
(10k elements) but I interpret them as a DataStream. The data needs to be
joined for enrichment purposes. I use event time and I know (because I
generated the data myself) that the timestamp of an element Y is always
between -60 minutes and +30 minutes of the element with the same key in set
X. Both datasets are in-order (in terms of timestamps), equal in size,
share a common key and parallelism is set to 1 throughout the whole program.

The code to join looks something like this:

xStream
  .assignAscendingTimestamps(_.date.getTime)
  .keyBy(_.commonKey)
  .intervalJoin(
yStream
  .assignAscendingTimestamps(_.date.getTime)
  .keyBy(_.commonKey))
  .between(Time.minutes(-60), Time.minutes(30))
  .process(new ProcessJoinFunction[X, Y, String] {
override def processElement(
left: X,
right: Y,
ctx: ProcessJoinFunction[X, Y, String]#Context,
out: Collector[String]): Unit = {

  out.collect(left + ":" + right)
}


However, about 30% percent of the data is not joined. Is there a proper way
to debug this? For instance, in windows you can side-output late data. Is
there a possibility to side-output unjoinable data?

Thx a lot,
Wouter


Re: Unexpected behavior from interval join in Flink

2019-06-21 Thread Wouter Zorgdrager
Anyone some leads on this issue? Have been looking into the
IntervalJoinOperator code, but that didn't really help. My intuition is
that it is rejected because of lateness, however that still confuses me
since I'm sure that both datastreams have monotonic increasing timestamps.

Thx, Wouter

Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager <
w.d.zorgdra...@tudelft.nl>:

> Hi all,
>
> I'm experiencing some unexpected behavior using an interval join in Flink.
> I'm dealing with two data sets, lets call them X and Y. They are finite
> (10k elements) but I interpret them as a DataStream. The data needs to be
> joined for enrichment purposes. I use event time and I know (because I
> generated the data myself) that the timestamp of an element Y is always
> between -60 minutes and +30 minutes of the element with the same key in set
> X. Both datasets are in-order (in terms of timestamps), equal in size,
> share a common key and parallelism is set to 1 throughout the whole program.
>
> The code to join looks something like this:
>
> xStream
>   .assignAscendingTimestamps(_.date.getTime)
>   .keyBy(_.commonKey)
>   .intervalJoin(
> yStream
>   .assignAscendingTimestamps(_.date.getTime)
>   .keyBy(_.commonKey))
>   .between(Time.minutes(-60), Time.minutes(30))
>   .process(new ProcessJoinFunction[X, Y, String] {
> override def processElement(
> left: X,
> right: Y,
> ctx: ProcessJoinFunction[X, Y, String]#Context,
> out: Collector[String]): Unit = {
>
>   out.collect(left + ":" + right)
> }
>
>
> However, about 30% percent of the data is not joined. Is there a proper
> way to debug this? For instance, in windows you can side-output late data.
> Is there a possibility to side-output unjoinable data?
>
> Thx a lot,
> Wouter
>
>
>


Re: Unexpected behavior from interval join in Flink

2019-06-24 Thread Wouter Zorgdrager
Hi Fabian,

Thanks for your reply. I managed to resolve this issue. Actually this
behavior was not so unexpected, I messed up using xStream as a 'base' while
I needed to use yStream as a 'base'. I.e. yStream.element - 60 min <=
xStream.element <= yStream.element + 30 min. Interchanging both datastreams
fixed this issue.

Thanks anyways.

Cheers, Wouter



Op ma 24 jun. 2019 om 11:22 schreef Fabian Hueske :

> Hi Wouter,
>
> Not sure what is going wrong there, but something that you could try is to
> use a custom watemark assigner and always return a watermark of 0.
> When the source finished serving the watermarks, it emits a final
> Long.MAX_VALUE watermark.
> Hence the join should consume all events and store them in state. When
> both sources are finished, it would start to join the data and clean up the
> state.
> This test would show if there are any issue with late data.
>
> Best, Fabian
>
> Am Fr., 21. Juni 2019 um 15:32 Uhr schrieb Wouter Zorgdrager <
> w.d.zorgdra...@tudelft.nl>:
>
>> Anyone some leads on this issue? Have been looking into the
>> IntervalJoinOperator code, but that didn't really help. My intuition is
>> that it is rejected because of lateness, however that still confuses me
>> since I'm sure that both datastreams have monotonic increasing timestamps.
>>
>> Thx, Wouter
>>
>> Op ma 17 jun. 2019 om 13:20 schreef Wouter Zorgdrager <
>> w.d.zorgdra...@tudelft.nl>:
>>
>>> Hi all,
>>>
>>> I'm experiencing some unexpected behavior using an interval join in
>>> Flink.
>>> I'm dealing with two data sets, lets call them X and Y. They are finite
>>> (10k elements) but I interpret them as a DataStream. The data needs to be
>>> joined for enrichment purposes. I use event time and I know (because I
>>> generated the data myself) that the timestamp of an element Y is always
>>> between -60 minutes and +30 minutes of the element with the same key in set
>>> X. Both datasets are in-order (in terms of timestamps), equal in size,
>>> share a common key and parallelism is set to 1 throughout the whole program.
>>>
>>> The code to join looks something like this:
>>>
>>> xStream
>>>   .assignAscendingTimestamps(_.date.getTime)
>>>   .keyBy(_.commonKey)
>>>   .intervalJoin(
>>> yStream
>>>   .assignAscendingTimestamps(_.date.getTime)
>>>   .keyBy(_.commonKey))
>>>   .between(Time.minutes(-60), Time.minutes(30))
>>>   .process(new ProcessJoinFunction[X, Y, String] {
>>> override def processElement(
>>> left: X,
>>> right: Y,
>>> ctx: ProcessJoinFunction[X, Y, String]#Context,
>>> out: Collector[String]): Unit = {
>>>
>>>   out.collect(left + ":" + right)
>>> }
>>>
>>>
>>> However, about 30% percent of the data is not joined. Is there a proper
>>> way to debug this? For instance, in windows you can side-output late data.
>>> Is there a possibility to side-output unjoinable data?
>>>
>>> Thx a lot,
>>> Wouter
>>>
>>>
>>>


RichAsyncFunction in Scala

2018-01-31 Thread Wouter Zorgdrager
Hi,

Currently there is no way of using the RichAsyncFunction in Scala, this
means I can't get access to the RuntimeContext. I know someone is working
on this: https://issues.apache.org/jira/browse/FLINK-6756 , however in the
meantime is there a workaround for this? I'm particularly interested in
getting the index of the subtask in my AsyncFunction.

Regards,
Wouter


KafkaProducer with generic (Avro) serialization schema

2018-04-25 Thread Wouter Zorgdrager
Dear reader,

I'm currently working on writing a KafkaProducer which is able to serialize
a generic type using avro4s.
However this serialization schema is not serializable itself. Here is my
code for this:

The serialization schema:
class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
extends SerializationSchema[IN] {

  override def serialize(element: IN): Array[Byte] = {
val byteArray = new ByteArrayOutputStream()
val avroSer = AvroOutputStream.binary[IN](byteArray)
avroSer.write(element)
avroSer.flush()
avroSer.close()

return byteArray.toByteArray
  }
}

The job code:
case class Person(name : String, age : Int, address : Address)
case class Address(city : String, street : String)

class SimpleJob {

  @transient
  private lazy val serSchema : AvroSerializationSchema[Person] = new
AvroSerializationSchema[Person]()

  def start() = {
val testPerson = Person("Test", 100, Address("Test", "Test"))

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.
  fromCollection(Seq(testPerson)).
  addSink(createKafkaSink())

env.execute("Flink sample job")
  }


  def createKafkaSink() : RichSinkFunction[Person] = {
//set some properties
val properties = new Properties()
properties.put("bootstrap.servers", "127.0.0.01:9092")
properties.put("zookeeper.connect", "127.0.0.1:2181")

new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
  }

}

The code does compile, however it gives the following error on
runtime: InvalidProgramException: Object
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d
is not serializable.

I assume this means that my custom SerializationSchema is not serializable
due to the use of SchemaFor, FromRecord and ToRecord.
Anyone knows a solution or workaround?

Thanks in advance!
Wouter


Re: KafkaProducer with generic (Avro) serialization schema

2018-04-26 Thread Wouter Zorgdrager
Hi Bill,

Thanks for your answer. However this proposal isn't going to solve my
issue, since the problem here is that the context bounds I need to give in
order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't
serializable classes. This results in Flink not being able to serialize the
KafkaProducer failing the whole job.

Thanks,
Wouter

Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill :

> The things I would try would first in you are you class Person and Address
> have getters and setters and a no argument constructor.
>
>
>
> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com]
> *Sent:* Wednesday, April 25, 2018 7:17 AM
> *To:* user@flink.apache.org
> *Subject:* KafkaProducer with generic (Avro) serialization schema
>
>
>
> Dear reader,
>
>
>
> I'm currently working on writing a KafkaProducer which is able to
> serialize a generic type using avro4s.
>
> However this serialization schema is not serializable itself. Here is my
> code for this:
>
>
>
> The serialization schema:
>
> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
> extends SerializationSchema[IN] {
>
>
>
>   override def serialize(element: IN): Array[Byte] = {
>
> val byteArray = new ByteArrayOutputStream()
>
> val avroSer = AvroOutputStream.binary[IN](byteArray)
>
> avroSer.write(element)
>
> avroSer.flush()
>
> avroSer.close()
>
>
>
> return byteArray.toByteArray
>
>   }
>
> }
>
>
>
> The job code:
>
> case class Person(name : String, age : Int, address : Address)
>
> case class Address(city : String, street : String)
>
>
>
> class SimpleJob {
>
>
>
>   @transient
>
>   private lazy val serSchema : AvroSerializationSchema[Person] = new
> AvroSerializationSchema[Person]()
>
>
>
>   def start() = {
>
> val testPerson = Person("Test", 100, Address("Test", "Test"))
>
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>
>
> env.
>
>   fromCollection(Seq(testPerson)).
>
>   addSink(createKafkaSink())
>
>
>
> env.execute("Flink sample job")
>
>   }
>
>
>
>
>
>   def createKafkaSink() : RichSinkFunction[Person] = {
>
> //set some properties
>
> val properties = new Properties()
>
> properties.put("bootstrap.servers", "127.0.0.01:9092
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc&e=>
> ")
>
> properties.put("zookeeper.connect", "127.0.0.1:2181
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ&e=>
> ")
>
>
>
> new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
>
>   }
>
>
>
> }
>
>
>
> The code does compile, however it gives the following error on
> runtime: InvalidProgramException: Object
> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d
> is not serializable.
>
>
>
> I assume this means that my custom SerializationSchema is not serializable
> due to the use of SchemaFor, FromRecord and ToRecord.
>
> Anyone knows a solution or workaround?
>
>
>
> Thanks in advance!
>
> Wouter
>
> This message contains confidential information and is intended only for
> the individual named. If you are not the named addressee, you should not
> disseminate, distribute, alter or copy this e-mail. Please notify the
> sender immediately by e-mail if you have received this e-mail by mistake
> and delete this e-mail from your system. E-mail transmissions cannot be
> guaranteed to be secure or without error as information could be
> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
> contain viruses. The sender, therefore, does not accept liability for any
> errors or omissions in the contents of this message which arise during or
> as a result of e-mail transmission. If verification is required, please
> request a hard-copy version. This message is provided for information
> purposes and should not be construed as a solicitation or offer to buy or
> sell any securities or related financial instruments in any jurisdiction.
> Securities are offered in 

Re: KafkaProducer with generic (Avro) serialization schema

2018-05-01 Thread Wouter Zorgdrager
So, I'm still struggling with this issue. I dived a bit more into the
problem and I'm pretty sure that the problem is that I have to (implicitly)
pass the SchemaFor and RecordTo classes to my serialization schema
(otherwise I can't make it generic). However those class aren't
serializable, but of course I can't annotate them transient nor make it a
lazy val which gives me the current issue.

I hope someone has some leads for me.

Thanks!

Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager :

> Hi Bill,
>
> Thanks for your answer. However this proposal isn't going to solve my
> issue, since the problem here is that the context bounds I need to give in
> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't
> serializable classes. This results in Flink not being able to serialize the
> KafkaProducer failing the whole job.
>
> Thanks,
> Wouter
>
> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <
> william.nort...@pimco.com>:
>
>> The things I would try would first in you are you class Person and
>> Address have getters and setters and a no argument constructor.
>>
>>
>>
>> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com]
>> *Sent:* Wednesday, April 25, 2018 7:17 AM
>> *To:* user@flink.apache.org
>> *Subject:* KafkaProducer with generic (Avro) serialization schema
>>
>>
>>
>> Dear reader,
>>
>>
>>
>> I'm currently working on writing a KafkaProducer which is able to
>> serialize a generic type using avro4s.
>>
>> However this serialization schema is not serializable itself. Here is my
>> code for this:
>>
>>
>>
>> The serialization schema:
>>
>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
>> extends SerializationSchema[IN] {
>>
>>
>>
>>   override def serialize(element: IN): Array[Byte] = {
>>
>> val byteArray = new ByteArrayOutputStream()
>>
>> val avroSer = AvroOutputStream.binary[IN](byteArray)
>>
>> avroSer.write(element)
>>
>> avroSer.flush()
>>
>> avroSer.close()
>>
>>
>>
>> return byteArray.toByteArray
>>
>>   }
>>
>> }
>>
>>
>>
>> The job code:
>>
>> case class Person(name : String, age : Int, address : Address)
>>
>> case class Address(city : String, street : String)
>>
>>
>>
>> class SimpleJob {
>>
>>
>>
>>   @transient
>>
>>   private lazy val serSchema : AvroSerializationSchema[Person] = new
>> AvroSerializationSchema[Person]()
>>
>>
>>
>>   def start() = {
>>
>> val testPerson = Person("Test", 100, Address("Test", "Test"))
>>
>>
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>>
>>
>> env.
>>
>>   fromCollection(Seq(testPerson)).
>>
>>   addSink(createKafkaSink())
>>
>>
>>
>> env.execute("Flink sample job")
>>
>>   }
>>
>>
>>
>>
>>
>>   def createKafkaSink() : RichSinkFunction[Person] = {
>>
>> //set some properties
>>
>> val properties = new Properties()
>>
>> properties.put("bootstrap.servers", "127.0.0.01:9092
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc&e=>
>> ")
>>
>> properties.put("zookeeper.connect", "127.0.0.1:2181
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ&e=>
>> ")
>>
>>
>>
>> new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
>>
>>   }
>>
>>
>>
>> }
>>
>>
>>
>> The code does compile, however it gives the following error on
>> runtime: InvalidProgramException: Object
>> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d
>> is not serializable.
>>
>>
>>
>> I assume this means that my custom SerializationSchema is not
>> serializable due to the use of SchemaFor, FromRecord

Re: KafkaProducer with generic (Avro) serialization schema

2018-05-02 Thread Wouter Zorgdrager
Hi,

Thanks for the suggestions.

Unfortunately I cannot make FromRecord/ForRecord/SchemaFor serializable,
since those classes are out of my control. I use those from the avro4s
library (https://github.com/sksamuel/avro4s). The problem here, especially
with the deserializer is that I need to convert an Avro 'GenericRecord' to
a Scala case class. Avro is written in Java, so thats a bit problematic and
therefore I need to Avro4s library. Avro4s tries to verify on compile-time
if the generic is actually convertible from/to a generic record, that is
why I need those context bounds.

As for @Aljoscha's workaround, I don't understand how this would solve it?
Because doesn't that just move the problem? If I create a factory, I still
need the generic (with context bounds) I specify at my
KafkaConsumer/Deserialization schema.

@Fabian I'm not sure if I understand your proposal. I still need the
context bounds for those compile-time macro's of Avro4s.

Once again, thanks for your help so far!

Regards,
Wouter



Op wo 2 mei 2018 om 16:48 schreef Fabian Hueske :

> Hi Wouter,
>
> you can try to make the SerializationSchema serializable by overriding
> Java's serialization methods writeObject() and readObject() similar as
> Flink's AvroRowSerializationSchema [1] does.
>
> Best, Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
>
> 2018-05-02 16:34 GMT+02:00 Piotr Nowojski :
>
>> Hi,
>>
>> My Scala knowledge is very limited (and my Scala's serialization
>> knowledge is non existent), but one way or another you have to make your
>> SerializationSchema serialisable. If indeed this is the problem, maybe a
>> better place to ask this question is on Stack Overflow or some scala
>> specific mailing list/board (unless someone else from the Flink's community
>> can provide an answer to this problem)?
>>
>> Piotrek
>>
>> On 1 May 2018, at 16:30, Wouter Zorgdrager  wrote:
>>
>> So, I'm still struggling with this issue. I dived a bit more into the
>> problem and I'm pretty sure that the problem is that I have to (implicitly)
>> pass the SchemaFor and RecordTo classes to my serialization schema
>> (otherwise I can't make it generic). However those class aren't
>> serializable, but of course I can't annotate them transient nor make it a
>> lazy val which gives me the current issue.
>>
>> I hope someone has some leads for me.
>>
>> Thanks!
>>
>> Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <
>> zorgdrag...@gmail.com>:
>>
>>> Hi Bill,
>>>
>>> Thanks for your answer. However this proposal isn't going to solve my
>>> issue, since the problem here is that the context bounds I need to give in
>>> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't
>>> serializable classes. This results in Flink not being able to serialize the
>>> KafkaProducer failing the whole job.
>>>
>>> Thanks,
>>> Wouter
>>>
>>> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <
>>> william.nort...@pimco.com>:
>>>
>>>> The things I would try would first in you are you class Person and
>>>> Address have getters and setters and a no argument constructor.
>>>>
>>>>
>>>>
>>>> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com]
>>>> *Sent:* Wednesday, April 25, 2018 7:17 AM
>>>> *To:* user@flink.apache.org
>>>> *Subject:* KafkaProducer with generic (Avro) serialization schema
>>>>
>>>>
>>>>
>>>> Dear reader,
>>>>
>>>>
>>>>
>>>> I'm currently working on writing a KafkaProducer which is able to
>>>> serialize a generic type using avro4s.
>>>>
>>>> However this serialization schema is not serializable itself. Here is
>>>> my code for this:
>>>>
>>>>
>>>>
>>>> The serialization schema:
>>>>
>>>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
>>>> extends SerializationSchema[IN] {
>>>>
>>>>
>>>>
>>>>   override def serialize(element: IN): Array[Byte] = {
>>>>
>>>> val byteArray = new ByteArrayOutputStream()
>>>>
>>>> val avroSer = AvroOutputStream.binary[IN](byteArray)
>>>>
>>>> av

Side outputs PyFlink

2021-05-20 Thread Wouter Zorgdrager
Dear Flink community,

First of all, I'm very excited about the new 1.13 release. Among
other features, I'm particularly excited about the support of stateful
operations in Python. I think it will make the wonders of stream processing
and the power of Flink accessible to more developers.

I'm currently playing around a bit with these new features and I was
wondering if there are already plans to support side output in the Python
API? This already works pretty neatly in the DataStream API but couldn't
find any communication on adding this to PyFlink.

In the meantime, what do you suggest for a workaround on side outputs?
Intuitively, I would copy a stream and add a filter for each side output
but this seems a bit inefficient. In that setup, each side output will need
to go over the complete stream. Any ideas?

Thanks in advance!
Regards,
Wouter


PyFlink DataStream union type mismatch

2021-05-20 Thread Wouter Zorgdrager
Dear all,

I'm having trouble unifying two data streams using the union operator in
PyFlink. My code basically looks like this:

init_stream = (operator_stream
   .filter(lambda r: r[0] is None)
   .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
   .key_by(lambda x: x[0], Types.STRING())
)

stateful_operator_stream = (operator_stream
.filter(lambda r: r[0] is not None)
.map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(),
Types.PICKLED_BYTE_ARRAY()]))
.key_by(lambda x: x[0],Types.STRING())
)
print(init_stream)print(init_stream.get_type())
print(stateful_operator_stream.get_type())print(stateful_operator_stream)

final_operator_stream = init_stream
.union(stateful_operator_stream)
.process(stateful_operator)


In short, I have a datastream (operator_stream) of type Tuple[str,
Event] which I define as a tuple of Types.STRING() and
Types.PICKLED.BYTE_ARRAY().
When calling the union operator, I get an error which shows a type
mismatch between both streams:

py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
: java.lang.IllegalArgumentException: Cannot union streams of
different types: Java Tuple2 and
Row(f0: String, f1: Java Tuple2)
at 
org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
However, when I print the types of both datastreams they seem similar:


RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))

Any thoughts? Thanks in advance!

Regards,
Wouter


Re: PyFlink DataStream union type mismatch

2021-05-23 Thread Wouter Zorgdrager
Hi Dian, all,

Thanks, that indeed solved my problem. I have two more questions, I'm not
sure if it is better practice to send a new email to the mailing list or to
re-use a thread:

1. I noticed very high latency (multiple seconds per message) for a job
with multiple operators and very low throughput. I bet because messages are
bundled until a size threshold or time threshold is met (and in a low
throughput scenario, only the time threshold is triggered).
This is also the idea I get when reading the configuration page [1].
However, these configuration values seem to be targeted at the TableAPI and
it is unclear to me how to configure this for the Datastream API. To be
clear, this is in PyFlink.

2. I'm using the JVM Kafka Consumer and Producer for my Python job.
Therefore I had to add the flink-connector-sql-kafka jar to my Flink
environment. I did this by downloading the jar file from Maven and putting
it under 'venv/pyflink/lib'. Is there any easier way? I'm not particularly
a fan of manually changing my venv.
I tried to use stream_execution_environment.add_jars but that was
unsuccessful, I still got a ClassNotFoundException.

Hope you can help. As always, thanks a lot!

Regards,
Wouter


[1] -
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/python/python_config/

On Fri, 21 May 2021 at 05:25, Dian Fu  wrote:

> Hi Wouter,
>
> 1) For the exception, it seems a bug. I have filed a ticket for it:
> https://issues.apache.org/jira/browse/FLINK-22733
>
> 2) Regarding to your requirements, I guess you should do it as following:
> ```
>
> init_stream = (operator_stream
>.filter(lambda r: r[0] is None)
>
> .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
> )
>
> stateful_operator_stream = (operator_stream
> .filter(lambda r: r[0] is not None)
> .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 
> Types.PICKLED_BYTE_ARRAY()]))
> )
>
>
>
> init_stream.union(stateful_operator_stream).key_by(lambda x: x[0
> ],Types.STRING())
>
> ```
>
> The reason is that `union` will turns `KeyedStream` into `DataStream` and
> you could not perform stateful operations on `DataStream` any more.
>
> Regards,
> Dian
>
> 2021年5月21日 上午12:38,Wouter Zorgdrager  写道:
>
> Dear all,
>
> I'm having trouble unifying two data streams using the union operator in
> PyFlink. My code basically looks like this:
>
> init_stream = (operator_stream
>.filter(lambda r: r[0] is None)
>
> .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
>.key_by(lambda x: x[0], Types.STRING())
> )
>
> stateful_operator_stream = (operator_stream
> .filter(lambda r: r[0] is not None)
> .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 
> Types.PICKLED_BYTE_ARRAY()]))
> .key_by(lambda x: x[0],Types.STRING())
> )
> print(init_stream)print(init_stream.get_type())
> print(stateful_operator_stream.get_type())print(stateful_operator_stream)
>
> final_operator_stream = init_stream
> .union(stateful_operator_stream)
> .process(stateful_operator)
>
>
> In short, I have a datastream (operator_stream) of type Tuple[str, Event] 
> which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY().
> When calling the union operator, I get an error which shows a type mismatch 
> between both streams:
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
> : java.lang.IllegalArgumentException: Cannot union streams of different 
> types: Java Tuple2 and Row(f0: String, f1: 
> Java Tuple2)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>   at java.base/java

ByteSerializationSchema in PyFlink

2021-06-03 Thread Wouter Zorgdrager
Hi all,

I have a PyFlink job connected to a KafkaConsumer and Producer. I want to
directly work with the bytes from and to Kafka because I want to
serialize/deserialize in my Python code rather than the JVM environment.
Therefore, I can't use the SimpleStringSchema for (de)serialization (the
messages aren't strings anyways). I've tried to create a
TypeInformationSerializer with Types.BYTE(), see the code snippet below:

class ByteSerializer(SerializationSchema, DeserializationSchema):
def __init__(self, execution_environment):
gate_way = get_gateway()

j_byte_string_schema =
gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
Types.BYTE().get_java_type_info(),
get_j_env_configuration(execution_environment),
)
SerializationSchema.__init__(self,
j_serialization_schema=j_byte_string_schema)
DeserializationSchema.__init__(
self, j_deserialization_schema=j_byte_string_schema
)The ByteSerializer is used like this:


return FlinkKafkaConsumer(
["client_request", "internal"],
ByteSerializer(self.env._j_stream_execution_environment),
{
"bootstrap.servers": "localhost:9092",
"auto.offset.reset": "latest",
"group.id": str(uuid.uuid4()),
},
)
However, this does not seem to work. I think the error is thrown in
the JVM environment, which makes it a bit hard to parse in my Python
stack trace,

but I think it boils down to this stacktrace part:


answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException:
Constructor 
org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat
java.base/java.lang.Thread.run(Thread.java:834)\\n'
gateway_client = 
target_id = None
name = 
'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'

def get_return_value(answer, gateway_client, target_id=None, name=None):
"""Converts an answer received from the Java gateway into a
Python object.

For example, string representation of integers are converted to Python
integer, string representation of objects are converted to JavaObject
instances, etc.

:param answer: the string returned by the Java gateway
:param gateway_client: the gateway client used to communicate
with the Java
Gateway. Only necessary if the answer is a reference (e.g., object,
list, map)
:param target_id: the name of the object from which the answer
comes from
(e.g., *object1* in `object1.hello()`). Optional.
:param name: the name of the member from which the answer comes from
(e.g., *hello* in `object1.hello()`). Optional.
"""
if is_error(answer)[0]:
if len(answer) > 1:
type = answer[1]
value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
if answer[1] == REFERENCE_TYPE:
raise Py4JJavaError(
"An error occurred while calling {0}{1}{2}.\n".
format(target_id, ".", name), value)
else:
>   raise Py4JError(
"An error occurred while calling {0}{1}{2}.
Trace:\n{3}\n".
format(target_id, ".", name, value))
E   py4j.protocol.Py4JError: An error occurred while
calling 
None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.
Trace:
E
org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class
org.apache.flink.configuration.Configuration]) does not exist
E   at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
E   at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
E   at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237)
E   at
org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
E   at
org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
E   at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E   at java.base/java.lang.Thread.run(Thread.java:834)
I hope you can help me out!


Thanks in advance,

Wouter


Re: ByteSerializationSchema in PyFlink

2021-06-04 Thread Wouter Zorgdrager
Hi Dian, all,

Thanks for your suggestion. Unfortunately, it does not seem to work. I get
the following exception:

Caused by: java.lang.NegativeArraySizeException: -2147183315
at
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31)
at
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

To be more precise, the messages in my Kafka topic are pickled Python
objects. Maybe that is the reason for the exception, I also tried
using Types.PICKLED_BYTE_ARRAY().get_java_type_info()
but I think that has the same serializer because I get the same exception.

Any suggestions? Thanks for your help!

Regards,
Wouter

On Fri, 4 Jun 2021 at 08:24, Dian Fu  wrote:

> Hi Wouter,
>
> E   org.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
> org.apache.flink.configuration.Configuration]) does not exist
>
>
> As the exception indicate, the constructor doesn’t exists.
>
>
>
> Could you try with the following:
>
> ```
> j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
> j_type_serializer=
>  
> j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
>
> j_byte_string_schema = 
> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info,
>  j_type_serializer)
>
> ```
>
> Regards,
> Dian
>
> 2021年6月3日 下午8:51,Wouter Zorgdrager  写道:
>
> Hi all,
>
> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to
> directly work with the bytes from and to Kafka because I want to
> serialize/deserialize in my Python code rather than the JVM environment.
> Therefore, I can't use the SimpleStringSchema for (de)serialization (the
> messages aren't strings anyways). I've tried to create a
> TypeInformationSerializer with Types.BYTE(), see the code snippet below:
>
> class ByteSerializer(SerializationSchema, DeserializationSchema):
> def __init__(self, execution_environment):
> gate_way = get_gateway()
>
> j_byte_string_schema = 
> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
> Types.BYTE().get_java_type_info(),
> get_j_env_configuration(execution_environment),
> )
> SerializationSchema.__init__(self, 
> j_serialization_schema=j_byte_string_schema)
> DeserializationSchema.__init__(
> self, j_deserialization_schema=j_byte_string_schema
> )The ByteSerializer is used like this:
>
>
> return FlinkKafkaConsumer(
> ["client_request", "internal"],
> ByteSerializer(self.env._j_stream_execution_environment),
> {
> "bootstrap.servers": "localhost:9092",
> "auto.offset.reset": "latest",
> "group.id": str(uuid.uuid4()),
> },
> )
> However, this does not seem to work. I think the error is thrown in the JVM 
> environment, which makes it a bit hard to parse in my Python stack trace,
>
> but I think it boils down to this stacktrace part:
>
>
> answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat
>  java.base/java.lang.Thread.run(Thread.java:834)\\n'
> gateway_client = 
> target_id = None
> name = 

Re: ByteSerializationSchema in PyFlink

2021-06-08 Thread Wouter Zorgdrager
Hi Dian, all,

The way I resolved right now, is to write my own custom serializer which
only maps from bytes to bytes. See the code below:
public class KafkaBytesSerializer implements SerializationSchema,
DeserializationSchema {

@Override
public byte[] deserialize(byte[] bytes) throws IOException {
return bytes;
}

@Override
public boolean isEndOfStream(byte[] bytes) {
return false;
}

@Override
public byte[] serialize(byte[] bytes) {
return bytes;
}

@Override
public TypeInformation getProducedType() {
return TypeInformation.of(byte[].class);
}
}

This code is packaged in a jar and uploaded through env.add_jars. That
works like a charm!

Thanks for the help!
Wouter

On Fri, 4 Jun 2021 at 14:40, Wouter Zorgdrager 
wrote:

> Hi Dian, all,
>
> Thanks for your suggestion. Unfortunately, it does not seem to work. I get
> the following exception:
>
> Caused by: java.lang.NegativeArraySizeException: -2147183315
> at
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
> at
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31)
> at
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92)
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>
> To be more precise, the messages in my Kafka topic are pickled Python
> objects. Maybe that is the reason for the exception, I also tried using 
> Types.PICKLED_BYTE_ARRAY().get_java_type_info()
> but I think that has the same serializer because I get the same exception.
>
> Any suggestions? Thanks for your help!
>
> Regards,
> Wouter
>
> On Fri, 4 Jun 2021 at 08:24, Dian Fu  wrote:
>
>> Hi Wouter,
>>
>> E   org.apache.flink.api.python.shaded.py4j.Py4JException: 
>> Constructor 
>> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
>> org.apache.flink.configuration.Configuration]) does not exist
>>
>>
>> As the exception indicate, the constructor doesn’t exists.
>>
>>
>>
>> Could you try with the following:
>>
>> ```
>> j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
>> j_type_serializer=
>>  
>> j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
>>
>> j_byte_string_schema = 
>> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info,
>>  j_type_serializer)
>>
>> ```
>>
>> Regards,
>> Dian
>>
>> 2021年6月3日 下午8:51,Wouter Zorgdrager  写道:
>>
>> Hi all,
>>
>> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to
>> directly work with the bytes from and to Kafka because I want to
>> serialize/deserialize in my Python code rather than the JVM environment.
>> Therefore, I can't use the SimpleStringSchema for (de)serialization (the
>> messages aren't strings anyways). I've tried to create a
>> TypeInformationSerializer with Types.BYTE(), see the code snippet below:
>>
>> class ByteSerializer(SerializationSchema, DeserializationSchema):
>> def __init__(self, execution_environment):
>> gate_way = get_gateway()
>>
>> j_byte_string_schema = 
>> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
>> Types.BYTE().get_java_type_info(),
>> get_j_env_configuration(execution_environment),
>> )
>> SerializationSchema.__init__(self, 
>> j_serialization_schema=j_byte_str

PyFlink performance and deployment issues

2021-07-06 Thread Wouter Zorgdrager
Dear community,

I have been struggling a lot with the deployment of my PyFlink job.
Moreover, the performance seems to be very disappointing especially the
low-throughput latency. I have been playing around with configuration
values, but it has not been improving.
In short, I have a Datastream job with multiple Python operators including
a ProcessFunction. The job reads from Kafka and writes to Kafka again. For
single events, E2E latency has been somewhere between 600ms and 2000ms.
When I'm increasing throughput, latency becomes in the order of seconds.
This is when I configure my job like this
config.set_integer("python.fn-execution.bundle.time", 1)
config.set_integer("python.fn-execution.bundle.size", 1)
I tried several configuration values, but the results are similar.
Interestingly, I have a similar Python streaming application written in
Apache Beam which does have low-latency, single events are processed <
30ms.  If I recall correctly, they use the same technique with bundling and
sending to Python processes.
On the other hand, Beam uses an in-memory runner when running locally which
might change the situation. I'm not sure how that compares to a local Flink
MiniCluster.

I hoped that performance might improve when I deploy this on a (remote)
Flink cluster. Unfortunately, I had a lot of trouble deploying this PyFlink
job to a remote Flink cluster. In my first attempt, I created a local TM +
JM setup and tried to deploy it using the "./flink run" command.
However, this command created a local MiniCluster again rather than
submitting it to my remote cluster. The full command was:
./flink run --target remote \
-m localhost:8081 \
-pyarch venv.zip \
-pyexec venv.zip/venv/bin/python \
--parallelism 4 \
--python ~/Documents/runtime.py \
--jarfile ~/Documents/combined.jar

Note that venv.zip stores all the Python dependencies for my PyFlink job
whereas combined.jar stores the Java dependencies. I tried several variants
of this command, but it *never *submitted to my running JobManager and
always ran it locally.
In my second attempt, I tried deploying it to a Kubernetes cluster using
the following command:

./flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=flink-cluster \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dkubernetes.service-account=flink-service-account \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=pyflink:latest \
-pyarch venv.zip \
-pyexec venv.zip/venv/bin/python \
--parallelism 4 \
-py ~/Documents/runtime.py \
--jarfile ~/Documents/combined.jar

I created the pyflink:latest image by following the documentation here [1]
It was unclear to me if had to include my project files in this Docker
image.
When running it like this, it did submit it to the remote K8s cluster but I
got an exception that it could not find my runtime.py file in some sort of
tmp folder.

Lastly, I wondered if it is possible to set a key for events send to the
KafkaProducer. Right now, it seems you can only configure some (static)
properties and the serializer.
Is there are a workaround to be able to set the key and value of an event
in PyFlink?

I hope you can help me out with my struggles! Thanks in advance.

Regards,
Wouter

[1] -
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python


Re: PyFlink performance and deployment issues

2021-07-08 Thread Wouter Zorgdrager
Hi Dian, all,

 I will come back to the other points asap. However, I’m still confused
about this performance. Is this what I can expect in PyFlink in terms of
performance? ~ 1000ms latency for single events? I also had a very simple
setup where I send 1000 events to Kafka per second and response
times/latencies was around 15 seconds for single events. I understand there
is some Python/JVM overhead but since Flink is so performant, I would
expect much better numbers. In the current situation, PyFlink would just be
unusable if you care about latency. Is this something that you expect to be
improved in the future?

I will verify how this works out for Beam in a remote environment.

Thanks again!
Wouter


On Thu, 8 Jul 2021 at 08:28, Dian Fu  wrote:

> Hi Wouter,
>
> 1) Regarding the performance difference between Beam and PyFlink, I guess
> it’s because you are using an in-memory runner when running it locally in
> Beam. In that case, the code path is totally differently compared to
> running in a remote cluster.
> 2) Regarding to `flink run`, I’m surprising that it’s running locally.
> Could you submit a java job with similar commands to see how it runs?
> 3) Regarding to `flink run-application`, could you share the exception
> stack?
>
> Regards,
> Dian
>
> 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:
>
> uses
>
>
>


Re: PyFlink performance and deployment issues

2021-07-08 Thread Wouter Zorgdrager
Hi Xingbo, all,

That is good to know, thank you. Is there any Jira issue I can track? I'm
curious to follow this progress! Do you have any recommendations with
regard to these two configuration values, to get somewhat reasonable
performance?

Thanks a lot!
Wouter

On Thu, 8 Jul 2021 at 10:26, Xingbo Huang  wrote:

> Hi Wouter,
>
> In fact, our users have encountered the same problem. Whenever the `bundle
> size` or `bundle time` is reached, the data in the buffer needs to be sent
> from the jvm to the pvm, and then waits for the pym to be processed and
> sent back to the jvm to send all the results to the downstream operator,
> which leads to a large delay, especially when it is a small size event as
> small messages are hard to be processed in pipeline.
>
> I have been solving this problem recently and I plan to make this
> optimization to release-1.14.
>
> Best,
> Xingbo
>
> Wouter Zorgdrager  于2021年7月8日周四 下午3:41写道:
>
>> Hi Dian, all,
>>
>>  I will come back to the other points asap. However, I’m still confused
>> about this performance. Is this what I can expect in PyFlink in terms of
>> performance? ~ 1000ms latency for single events? I also had a very simple
>> setup where I send 1000 events to Kafka per second and response
>> times/latencies was around 15 seconds for single events. I understand there
>> is some Python/JVM overhead but since Flink is so performant, I would
>> expect much better numbers. In the current situation, PyFlink would just be
>> unusable if you care about latency. Is this something that you expect to be
>> improved in the future?
>>
>> I will verify how this works out for Beam in a remote environment.
>>
>> Thanks again!
>> Wouter
>>
>>
>> On Thu, 8 Jul 2021 at 08:28, Dian Fu  wrote:
>>
>>> Hi Wouter,
>>>
>>> 1) Regarding the performance difference between Beam and PyFlink, I
>>> guess it’s because you are using an in-memory runner when running it
>>> locally in Beam. In that case, the code path is totally differently
>>> compared to running in a remote cluster.
>>> 2) Regarding to `flink run`, I’m surprising that it’s running locally.
>>> Could you submit a java job with similar commands to see how it runs?
>>> 3) Regarding to `flink run-application`, could you share the exception
>>> stack?
>>>
>>> Regards,
>>> Dian
>>>
>>> 2021年7月6日 下午4:58,Wouter Zorgdrager  写道:
>>>
>>> uses
>>>
>>>
>>>


Fwd: PyFlink performance and deployment issues

2021-08-14 Thread Wouter Zorgdrager
Hi all,

I'm still dealing with the PyFlink deployment issue as described below. I
see that I accidentally didn't forward it to the mailing list. Anyways, my
job is stuck in `Initializing` and the logs don't really give me a clue
what is going on.
In my IDE it runs fine. The command I use to submit to the cluster:

export
PYFLINK_CLIENT_EXECUTABLE=~/Documents/stateflow-evaluation/venv/bin/python

./flink run \
  --target remote \
  -m localhost:8081 \
  -pyarch venv.zip \
  --pyExecutable venv.zip/venv/bin/python \
  --parallelism 1 \
  --python ~/Documents/stateflow-evaluation/pyflink_runtime.py \
  --jarfile ~/Documents/stateflow-evaluation/benchmark/bin/combined.jar

I hope someone can help me with this because it is a blocker for me.

Thanks in advance,
Wouter
-- Forwarded message -----
From: Wouter Zorgdrager 
Date: Thu, 8 Jul 2021 at 12:20
Subject: Re: PyFlink performance and deployment issues
To: Xingbo Huang 


HI Xingbo, all,

Regarding point 2, I actually made a mistake there. I picked port 8081
(WebUI port) rather than the job submission port (--target remote -m
localhost:8081). For some reason, this does not give an error or warning
and just starts a local cluster. However, now I got another issue: my job
is stuck at initialization. Here an excerpt from the JM log:

2021-07-08 12:12:18,094 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying _stream_key_by_map_operator (1/1) (attempt #0)
with attempt id ca9abcc644c05f62a47b83f391c85cd9 to 127.0.1.1:38179-f09c77 @
wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Process-Stateful-User (1/1)
(f40fac621cb94c79cdb82146ae5521bb) switched from SCHEDULED to DEPLOYING.
2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying Process-Stateful-User (1/1) (attempt #0) with
attempt id f40fac621cb94c79cdb82146ae5521bb to 127.0.1.1:38179-f09c77 @
wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink:
Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1)
(58deef879a00052ba6b3447917005c35)
switched from SCHEDULED to DEPLOYING.
2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying Map-Egress -> (Filter -> Kafka-To-Client ->
Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (attempt
#0) with attempt id 58deef879a00052ba6b3447917005c35 to 127.0.1.1:38179-f09c77
@ wouter (dataPort=40987) with allocation id
d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,484 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Process-Stateful-User (1/1)
(f40fac621cb94c79cdb82146ae5521bb) switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,488 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - _stream_key_by_map_operator (1/1)
(ca9abcc644c05f62a47b83f391c85cd9) switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,489 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink:
Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1)
(58deef879a00052ba6b3447917005c35)
switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,490 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Source: Custom Source -> Route-Incoming-Events -> (
Filter-On-User -> Map -> (Filter-Init-User -> Init-User, Filter-Stateful-
User), Filter -> Map) (1/1) (c48649bd76abaf77486104e8cfcee7d8) switched from
DEPLOYING to INITIALIZING.

I run with parallelism 1 and these are the latest loglines from the TM
(there is no obvious error):
2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle size is configured to 5.
2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle time is configured to 1
milliseconds.
2021-07-08 12:12:18,791 WARN
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Error while loading kafka-version.properties: inStream parameter is
null
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka version: unknown
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka commitId: unknown
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka startTimeMs: 1625739138789
2021-07-08 12:12:18,806 INFO org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (1/1) to
produce into default topic clie

Can't start FlinkKafkaProducer using SSL

2021-08-23 Thread Wouter Zorgdrager
Hi all,

I'm trying to deploy a FlinkKafkaProducer in PyFlink on a remote cluster.
Unfortunately, I'm getting the following exception:

Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.
KafkaException: Failed to construct kafka producer
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.
KafkaProducer.(KafkaProducer.java:432)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.
KafkaProducer.(KafkaProducer.java:298)
at org.apache.flink.streaming.connectors.kafka.internals.
FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.createProducer(FlinkKafkaProducer.java:1230)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.initProducer(FlinkKafkaProducer.java:1346)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.beginTransaction(FlinkKafkaProducer.java:990)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.beginTransaction(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.
TwoPhaseCommitSinkFunction.beginTransactionInternal(
TwoPhaseCommitSinkFunction.java:403)
at org.apache.flink.streaming.api.functions.sink.
TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:
394)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.initializeState(FlinkKafkaProducer.java:1195)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
.tryRestoreFunction(StreamingFunctionUtils.java:189)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils
.restoreFunctionState(StreamingFunctionUtils.java:171)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
.initializeOperatorState(StreamOperatorStateHandler.java:118)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:290)
at org.apache.flink.streaming.runtime.tasks.OperatorChain
.initializeStateAndOpenOperators(OperatorChain.java:436)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
StreamTask.java:574)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(
StreamTask.java:554)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.
KafkaException: javax.security.auth.login.LoginException: unable to find
LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule
at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.
SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.
ChannelBuilders.create(ChannelBuilders.java:146)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.
ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientUtils
.createChannelBuilder(ClientUtils.java:99)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.
KafkaProducer.newSender(KafkaProducer.java:450)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.
KafkaProducer.(KafkaProducer.java:421)
... 22 more
Caused by: javax.security.auth.login.LoginException: unable to find
LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:
195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:
680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.
AbstractLogin.login(AbstractLogin.java:60)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.
LoginManager.(LoginManager.java:62)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.
LoginManager.acquireLoginManager(LoginManager.java:105)
at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.
SaslChannelBuilder.configure(SaslChannelBuilder.java:147)
... 27 more

My Kafka 

Row to tuple conversion in PyFlink when switching to 'thread' execution mode

2024-03-29 Thread Wouter Zorgdrager
Dear readers,

I'm running into some unexpected behaviour in PyFlink when switching
execution mode from process to thread. In thread mode, my `Row` gets
converted to a tuple whenever I use a UDF in a map operation. By this
conversion to tuples, we lose critical information such as column names.
Below is a minimal working example (mostly taken from the documentation):

```
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Row
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")


# This does work:
t_env.get_config().set("python.execution-mode", "process")

# This doesn't work:
#t_env.get_config().set("python.execution-mode", "thread")


def map_function(a: Row) -> Row:
return Row(a.a + 1, a.b * a.b)


# map operation with a python general scalar function
func = udf(
map_function,
result_type=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
table = (
t_env.from_elements(
[(2, 4), (0, 0)],
schema=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
.map(func)
.alias("a", "b")
.execute()
.print()
)
```
This results in the following exception:
2024-03-28 16:32:10 Caused by: pemja.core.PythonException: : 'tuple' object has no attribute 'a' 2024-03-28 16:32:10
at
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72)
2024-03-28 16:32:10 at
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102)
2024-03-28 16:32:10 at .(:1) 2024-03-28 16:32:10 at
/opt/flink/wouter/minimal_example.map_function(minimal_example.py:19)
Note that in process mode this works perfectly fine. Is this expected
behaviour and/or is there a workaround?

Kind regards,
Wouter