Re: Guide for building Flink image with Python doesn't work
Hi Gyula, According to the log, we can see that you downloaded the source package of pemja, not the wheel package of pemja[1]. I guess you are using the m1 machine. If you install pemja from the source package, you need to have JDK, gcc tools and CPython with Numpy in the environment. I believe this can be solved after you prepared those tools, but other dependencies of pyflink 1.15 do not support m1, which makes PyFlink 1.15 unable to install and use in m1. We have supported m1 in release 1.16[2]. If a large number of m1 users have big demand for PyFlink 1.15, I think we need to consider whether it is necessary to backport this support to 1.15, which will break our compatibility issues between minor versions. Best, Xingbo [1] https://pypi.org/project/pemja/0.1.4/ [2] https://issues.apache.org/jira/browse/FLINK-25188 Gyula Fóra 于2022年7月6日周三 13:56写道: > Here it is, copied from the docs essentially: > > FROM flink:1.15.0 > > > # install python3: it has updated Python to 3.9 in Debian 11 and so > install Python 3.7 from source > # it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially. > > RUN apt-get update -y && \ > apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev > libffi-dev git && \ > wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \ > tar -xvf Python-3.7.9.tgz && \ > cd Python-3.7.9 && \ > ./configure --without-tests --enable-shared && \ > make -j6 && \ > make install && \ > ldconfig /usr/local/lib && \ > cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \ > ln -s /usr/local/bin/python3 /usr/local/bin/python && \ > apt-get clean && \ > rm -rf /var/lib/apt/lists/* > > # install PyFlink > RUN pip3 install apache-flink==1.15.0 > > And I am running: > docker build --tag pyflink:latest . > > This gives the following error: > > > *#6 64.12 cwd: /tmp/pip-install-9_farey_/pemja/#6 64.12 > Complete output (1 lines):#6 64.12 Include folder should be at > '/usr/local/openjdk-11/include' but doesn't exist. Please check you've > installed the JDK properly.* > > A side note: > The Dockerfile in the docs is missing git so initially I got the following > error: > > *#7 57.73 raise OSError("%r was not found" % name)#7 57.73 > OSError: 'git' was not found * > > @Weihua Hu can you please send your working > Dockerfile? > > Gyula > > On Wed, Jul 6, 2022 at 4:16 AM Weihua Hu wrote: > >> Hi Gyula, >> >> I can build pyFlink image successfully by following this guide. Did you >> add a dependency outside of the documentation? And could you provide your >> Dockerfile >> >> >> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker >> >> Best, >> Weihua >> >> >> On Tue, Jul 5, 2022 at 11:40 PM Gyula Fóra wrote: >> >>> Well in any case either the official image is incorrect (maybe we should >>> include JDK by default not JRE) or we should update the >>> documentation regarding the python docker build because it simply doesn't >>> work at the moment. >>> >>> I am still looking for a full working example that adds the required >>> Python packages on top of a Flink 1.15.0 base image :) >>> >>> Gyula >>> >>> On Tue, Jul 5, 2022 at 5:36 PM Weihua Hu wrote: >>> In addition, you can try providing the Dockerfile Best, Weihua On Tue, Jul 5, 2022 at 11:24 PM Weihua Hu wrote: > Hi, > > The base image flink:1.15.0 is built from openjdk:11-jre, and this > image only installs jre but not jdk. > It looks like the package you want to install (pemja) depends on jdk. > you need install openjdk-11-jdk in dockerfile, > take a look to how it is installed in the official image: > > > https://hub.docker.com/layers/openjdk/library/openjdk/11-jdk/images/sha256-bc0af19c7c4f492fe6ff0c1d1c8c0e5dd90ab801385b220347bb91dbe2b4094f?context=explore > > > Best, > Weihua > > > On Tue, Jul 5, 2022 at 3:50 PM Gyula Fóra > wrote: > >> Hi All! >> >> I have been trying to experiment with the Flink python support on >> Kubernetes but I got stuck creating a custom image with all the necessary >> python libraries. >> >> I found this guide in the docs: >> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker >> >> However when I try to build a custom image using it, I get the >> following error: >> >> #7 131.7 Collecting pemja==0.1.4 >> #7 131.8 Downloading pemja-0.1.4.tar.gz (32 kB) >> #7 131.9 ERROR: Command errored out with exit status 255: >> #7 131.9 command: /usr/local/bin/python3.7 -c 'import sys, >> setuptools, tokenize; sys.argv[0] = >> '"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"'; >> __file__='"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';f=getattr(tokenize, >> '"'"'open'"'"', open)(_
Configure a kafka source dynamically (???)
When using the kafka connector, you need to set the topics in advance (by giving a list of them or a regex pattern for the topic names). Imagine a situation where the topics are not known in advance, of course you could use an all-pass regex pattern to match all the topics in the broker but what I want to know is whether it's possible to read from new topics on demand. E.g., initially the source starts without any topics to read from so nothing is read until it gets a control msg (which could be pushed to a control topic, for example) specifying the set of topics to subscribe to. I guess this could be somehow implemented using custom subscribers once this issue is merged/closed: https://issues.apache.org/jira/browse/FLINK-24660 but would it be possible to achieve this objective without having to periodically pull the broker, e.g., in a more reactive (push) way? I guess if the kafka source (or any other source for what it's worth) were to have a control signal like that then it would be more of an operator than a source, really... Salva PS: Does anyone know the current state of FLINK-24660? The changes seem to have been ready to merge for a while.
Re: [ANNOUNCE] Apache Flink 1.15.1 released
Thanks a lot for being our release manager David and everyone who contributed. Best, Xingbo David Anderson 于2022年7月8日周五 06:18写道: > The Apache Flink community is very happy to announce the release of Apache > Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15 > series. > > Apache Flink® is an open-source stream processing framework for > distributed, high-performing, always-available, and accurate data streaming > applications. > > The release is available for download at: > https://flink.apache.org/downloads.html > > Please check out the release blog post for an overview of the improvements > for this bugfix release: > > https://flink.apache.org/news/2022/07/06/release-1.15.1.html > > The full release notes are available in Jira: > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351546 > > We would like to thank all contributors of the Apache Flink community who > made this release possible! > > Regards, > David Anderson >
Re: PyFlink: restoring from savepoint
Hi John, Could you provide more information, e.g. the exact command submitting the job, the logs file, the PyFlink version, etc? Regards, Dian On Thu, Jul 7, 2022 at 7:53 PM John Tipper wrote: > Hi all, > > I have a PyFlink job running in Kubernetes. Savepoints and checkpoints are > being successfully saved to S3. However, I am unable to get the job to > start from a save point. > > The container is started with these args: > > “standalone-job”, “-pym”, “foo.main”, “-s”, “s3://”, > “-n” > > In the JM logs I can see “Starting > StandaloneApplicationClusterEntrypoint…” where my arguments are listed. > > However, I don’t see any restore occurring in the logs and my application > restarts with no state. How do I start a PyFlink job like this from a given > savepoint? > > Many thanks, > > John > > Sent from my iPhone
[ANNOUNCE] Apache Flink 1.15.1 released
The Apache Flink community is very happy to announce the release of Apache Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15 series. Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. The release is available for download at: https://flink.apache.org/downloads.html Please check out the release blog post for an overview of the improvements for this bugfix release: https://flink.apache.org/news/2022/07/06/release-1.15.1.html The full release notes are available in Jira: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351546 We would like to thank all contributors of the Apache Flink community who made this release possible! Regards, David Anderson
Re: Restoring a job from a savepoint
Thank you all, that’s very helpful. It looks like there’s something else that’s causing my cluster to not load my savepoints, so I’ve submitted a separate query for that. Many thanks, John Sent from my iPhone On 6 Jul 2022, at 21:24, Alexander Fedulov wrote: Hi John, use $ bin/flink run -s s3://my_bucket/path/to/savepoints/ (no trailing slash, including schema). where should contain a valid _metadata file. You should see logs like this: INFO o.a.f.r.c.CheckpointCoordinator [] - Starting job foobar from savepoint s3://my_bucket/path/to/savepoints/ () INFO o.a.f.r.c.CheckpointCoordinator [] org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job foobar from Savepoint 1 @ 0 for foobar located at s3://my_bucket/path/to/savepoints/. The indication of the correct restore should be the absence of exceptions. You might see messages like this one for operators that did not have any state in the savepoint: INFO o.a.f.r.c.CheckpointCoordinator [] - Skipping empty savepoint state for operator a0f11f7a2c416beb6b7aed14be0d63ca. Best, Alexander Fedulov On Wed, Jul 6, 2022 at 9:50 PM John Tipper mailto:john_tip...@hotmail.com>> wrote: Hi all, The docs on restoring a job from a savepoint (https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#resuming-from-savepoints) state that the syntax is: $ bin/flink run -s :savepointPath [:runArgs] and where "you may give a path to either the savepoint’s directory or the _metadata file." If I am using S3 as my store of state: state.savepoints.dir: s3://my_bucket/path/to/savepoints and an example savepoint is at: s3://my_bucket/path/to/savepoints//_metadata then what am I supposed to supply to the flink run command? Is it: 1. The full path including filesystem: s3://my_bucket/path/to/savepoints//_metadata or s3://my_bucket/path/to/savepoints/ 2. or the full path: my_bucket/path/to/savepoints//_metadata or my_bucket/path/to/savepoints/ 3. or the path relative to the savepoints directory: /_metadata or If I supply a directory, do I need to specify a trailing slash? Also, is there anything that I will see in the logs that will indicate that the restore from a savepoint has been successful? Many thanks, John
PyFlink: restoring from savepoint
Hi all, I have a PyFlink job running in Kubernetes. Savepoints and checkpoints are being successfully saved to S3. However, I am unable to get the job to start from a save point. The container is started with these args: “standalone-job”, “-pym”, “foo.main”, “-s”, “s3://”, “-n” In the JM logs I can see “Starting StandaloneApplicationClusterEntrypoint…” where my arguments are listed. However, I don’t see any restore occurring in the logs and my application restarts with no state. How do I start a PyFlink job like this from a given savepoint? Many thanks, John Sent from my iPhone
Re: Difference between Session Mode and Session Job(Flink Opearator)
Awesome, thanks! On Thu, Jul 7, 2022 at 1:21 PM Gyula Fóra wrote: > Hi! > > The Flink Kubernetes Operator on a high level supports 3 types of > resources: > >1. Session Deployment : Empty Flink Session cluster >2. Application Deployment: Flink Application cluster (single job / >cluster) >3. Session Job: Flink Job deployed to an existing Session Deployment. > > So in other words, the Session deployment only creates the Flink cluster. > The Session job can be deployed to an existing session deployment and it > represents an actual Flink job. > > I hope that helps :) > Gyula > > On Thu, Jul 7, 2022 at 7:42 AM bat man wrote: > >> Hi, >> >> I want to understand the difference between session mode and the new >> deployment mode - Flink Session Job which I believe is newly introduced as >> part of the Flink Operator(1.15) release. >> What's the benefit of using this mode as opposed to session mode as both >> run sessions to which flink jobs can be submitted. >> >> Cheers. >> H. >> >
Setting a timer within broadcast applyToKeyedState() (feature request)
Hello, I know we can’t set a timer in the processBroadcastElement() of the KeyedBroadcastProcessFunction as there is no key. However, there is a context.applyToKeyedState() method which allows us to iterate over the keyed state in the scope of a key. So it is possible to add access to the TimerService onto the Context parameter passed into that delegate? Since the code running in the applyToKeyedState() method is scoped to a key we should be able to set up timers for that key too. Thanks, James.
Re: Difference between Session Mode and Session Job(Flink Opearator)
Hi! The Flink Kubernetes Operator on a high level supports 3 types of resources: 1. Session Deployment : Empty Flink Session cluster 2. Application Deployment: Flink Application cluster (single job / cluster) 3. Session Job: Flink Job deployed to an existing Session Deployment. So in other words, the Session deployment only creates the Flink cluster. The Session job can be deployed to an existing session deployment and it represents an actual Flink job. I hope that helps :) Gyula On Thu, Jul 7, 2022 at 7:42 AM bat man wrote: > Hi, > > I want to understand the difference between session mode and the new > deployment mode - Flink Session Job which I believe is newly introduced as > part of the Flink Operator(1.15) release. > What's the benefit of using this mode as opposed to session mode as both > run sessions to which flink jobs can be submitted. > > Cheers. > H. >