Re: Guide for building Flink image with Python doesn't work

2022-07-07 Thread Xingbo Huang
Hi Gyula,

According to the log, we can see that you downloaded the source package of
pemja, not the wheel package of pemja[1]. I guess you are using the m1
machine. If you install pemja from the source package, you need to have
JDK, gcc tools and CPython with Numpy in the environment. I believe this
can be solved after you prepared those tools, but other dependencies of
pyflink 1.15 do not support m1, which makes PyFlink 1.15 unable to install
and use in m1.

We have supported m1 in release 1.16[2]. If a large number of m1 users have
big demand for PyFlink 1.15, I think we need to consider whether it is
necessary to backport this support to 1.15, which will break our
compatibility issues between minor versions.
Best,
Xingbo

[1] https://pypi.org/project/pemja/0.1.4/
[2] https://issues.apache.org/jira/browse/FLINK-25188

Gyula Fóra  于2022年7月6日周三 13:56写道:

> Here it is, copied from the docs essentially:
>
> FROM flink:1.15.0
>
>
> # install python3: it has updated Python to 3.9 in Debian 11 and so
> install Python 3.7 from source
> # it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.
>
> RUN apt-get update -y && \
> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
> libffi-dev git && \
> wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
> tar -xvf Python-3.7.9.tgz && \
> cd Python-3.7.9 && \
> ./configure --without-tests --enable-shared && \
> make -j6 && \
> make install && \
> ldconfig /usr/local/lib && \
> cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
> ln -s /usr/local/bin/python3 /usr/local/bin/python && \
> apt-get clean && \
> rm -rf /var/lib/apt/lists/*
>
> # install PyFlink
> RUN pip3 install apache-flink==1.15.0
>
> And I am running:
> docker build --tag pyflink:latest .
>
> This gives the following error:
>
>
> *#6 64.12  cwd: /tmp/pip-install-9_farey_/pemja/#6 64.12
> Complete output (1 lines):#6 64.12 Include folder should be at
> '/usr/local/openjdk-11/include' but doesn't exist. Please check you've
> installed the JDK properly.*
>
> A side note:
> The Dockerfile in the docs is missing git so initially I got the following
> error:
>
> *#7 57.73 raise OSError("%r was not found" % name)#7 57.73
> OSError: 'git' was not found *
>
> @Weihua Hu  can you please send your working
> Dockerfile?
>
> Gyula
>
> On Wed, Jul 6, 2022 at 4:16 AM Weihua Hu  wrote:
>
>> Hi Gyula,
>>
>> I can build pyFlink image successfully by following this guide. Did you
>> add a dependency outside of the documentation? And could you provide your
>> Dockerfile
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
>>
>> Best,
>> Weihua
>>
>>
>> On Tue, Jul 5, 2022 at 11:40 PM Gyula Fóra  wrote:
>>
>>> Well in any case either the official image is incorrect (maybe we should
>>> include JDK by default not JRE) or we should update the
>>> documentation regarding the python docker build because it simply doesn't
>>> work at the moment.
>>>
>>> I am still looking for a full working example that adds the required
>>> Python packages on top of a Flink 1.15.0 base image :)
>>>
>>> Gyula
>>>
>>> On Tue, Jul 5, 2022 at 5:36 PM Weihua Hu  wrote:
>>>
 In addition, you can try providing the Dockerfile

 Best,
 Weihua


 On Tue, Jul 5, 2022 at 11:24 PM Weihua Hu 
 wrote:

> Hi,
>
> The base image flink:1.15.0 is built from openjdk:11-jre, and this
> image only installs jre but not jdk.
> It looks like the package you want to install (pemja) depends on jdk.
> you need install openjdk-11-jdk in dockerfile,
> take a look to how it is installed in the official image:
>
>
> https://hub.docker.com/layers/openjdk/library/openjdk/11-jdk/images/sha256-bc0af19c7c4f492fe6ff0c1d1c8c0e5dd90ab801385b220347bb91dbe2b4094f?context=explore
>
>
> Best,
> Weihua
>
>
> On Tue, Jul 5, 2022 at 3:50 PM Gyula Fóra 
> wrote:
>
>> Hi All!
>>
>> I have been trying to experiment with the Flink python support on
>> Kubernetes but I got stuck creating a custom image with all the necessary
>> python libraries.
>>
>> I found this guide in the docs:
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
>>
>> However when I try to build a custom image using it, I get the
>> following error:
>>
>> #7 131.7 Collecting pemja==0.1.4
>> #7 131.8   Downloading pemja-0.1.4.tar.gz (32 kB)
>> #7 131.9 ERROR: Command errored out with exit status 255:
>> #7 131.9  command: /usr/local/bin/python3.7 -c 'import sys,
>> setuptools, tokenize; sys.argv[0] =
>> '"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';
>> __file__='"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';f=getattr(tokenize,
>> '"'"'open'"'"', open)(_

Configure a kafka source dynamically (???)

2022-07-07 Thread Salva Alcántara
When using the kafka connector, you need to set the topics in advance (by
giving a list of them or a regex pattern for the topic names). Imagine a
situation where the topics are not known in advance, of course you could
use an all-pass regex pattern to match all the topics in the broker but
what I want to know is whether it's possible to read from new topics on
demand.

E.g., initially the source starts without any topics to read from so
nothing is read until it gets a control msg (which could be pushed to a
control topic, for example) specifying the set of topics to subscribe to. I
guess this could be somehow implemented using custom subscribers once this
issue is merged/closed:

https://issues.apache.org/jira/browse/FLINK-24660

but would it be possible to achieve this objective without having to
periodically pull the broker, e.g., in a more reactive (push) way? I guess
if the kafka source (or any other source for what it's worth) were to have
a control signal like that then it would be more of an operator than a
source, really...

Salva

PS: Does anyone know the current state of FLINK-24660? The changes seem to
have been ready to merge for a while.


Re: [ANNOUNCE] Apache Flink 1.15.1 released

2022-07-07 Thread Xingbo Huang
Thanks a lot for being our release manager David and everyone who
contributed.

Best,
Xingbo

David Anderson  于2022年7月8日周五 06:18写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
>
> https://flink.apache.org/news/2022/07/06/release-1.15.1.html
>
> The full release notes are available in Jira:
>
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351546
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> David Anderson
>


Re: PyFlink: restoring from savepoint

2022-07-07 Thread Dian Fu
Hi John,

Could you provide more information, e.g. the exact command submitting the
job, the logs file, the PyFlink version, etc?

Regards,
Dian


On Thu, Jul 7, 2022 at 7:53 PM John Tipper  wrote:

> Hi all,
>
> I have a PyFlink job running in Kubernetes. Savepoints and checkpoints are
> being successfully saved to S3. However, I am unable to get the job to
> start from a save point.
>
> The container is started with these args:
>
> “standalone-job”, “-pym”, “foo.main”, “-s”, “s3://”,
> “-n”
>
> In the JM logs I can see “Starting
> StandaloneApplicationClusterEntrypoint…” where my arguments are listed.
>
> However, I don’t see any restore occurring in the logs and my application
> restarts with no state. How do I start a PyFlink job like this from a given
> savepoint?
>
> Many thanks,
>
> John
>
> Sent from my iPhone


[ANNOUNCE] Apache Flink 1.15.1 released

2022-07-07 Thread David Anderson
The Apache Flink community is very happy to announce the release of Apache
Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:

https://flink.apache.org/news/2022/07/06/release-1.15.1.html

The full release notes are available in Jira:


https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351546

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
David Anderson


Re: Restoring a job from a savepoint

2022-07-07 Thread John Tipper
Thank you all, that’s very helpful. It looks like there’s something else that’s 
causing my cluster to not load my savepoints, so I’ve submitted a separate 
query for that.

Many thanks,

John

Sent from my iPhone

On 6 Jul 2022, at 21:24, Alexander Fedulov  wrote:


Hi John,

use  $ bin/flink run -s s3://my_bucket/path/to/savepoints/  (no 
trailing slash, including schema).

where  should contain a valid _metadata file.

You should see logs like this:
INFO o.a.f.r.c.CheckpointCoordinator [] - Starting job foobar from savepoint 
s3://my_bucket/path/to/savepoints/ ()
INFO o.a.f.r.c.CheckpointCoordinator []  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 
foobar from Savepoint 1 @ 0 for foobar located at 
s3://my_bucket/path/to/savepoints/.

The indication of the correct restore should be the absence of exceptions. You 
might see messages like this one for operators that did not have any state in 
the savepoint:
INFO o.a.f.r.c.CheckpointCoordinator [] - Skipping empty savepoint state for 
operator a0f11f7a2c416beb6b7aed14be0d63ca.

Best,
Alexander Fedulov


On Wed, Jul 6, 2022 at 9:50 PM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
Hi all,


The docs on restoring a job from a savepoint 
(https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#resuming-from-savepoints)
 state that the syntax is:


$ bin/flink run -s :savepointPath [:runArgs]

and where "you may give a path to either the savepoint’s directory or the 
_metadata file."


If I am using S3 as my store of state:

state.savepoints.dir: s3://my_bucket/path/to/savepoints

and an example savepoint is at:


s3://my_bucket/path/to/savepoints//_metadata


then what am I supposed to supply to the flink run​ command?  Is it:


  1.  The full path including filesystem: 
s3://my_bucket/path/to/savepoints//_metadata or 
s3://my_bucket/path/to/savepoints/
  2.  or the full path: my_bucket/path/to/savepoints//_metadata 
or my_bucket/path/to/savepoints/
  3.  or the path relative to the savepoints directory: /_metadata or 

If I supply a directory, do I need to specify a trailing slash?

Also, is there anything that I will see in the logs that will indicate that the 
restore from a savepoint has been successful?

Many thanks,

John



PyFlink: restoring from savepoint

2022-07-07 Thread John Tipper
Hi all,

I have a PyFlink job running in Kubernetes. Savepoints and checkpoints are 
being successfully saved to S3. However, I am unable to get the job to start 
from a save point.

The container is started with these args:

“standalone-job”, “-pym”, “foo.main”, “-s”, “s3://”, “-n”

In the JM logs I can see “Starting StandaloneApplicationClusterEntrypoint…” 
where my arguments are listed.

However, I don’t see any restore occurring in the logs and my application 
restarts with no state. How do I start a PyFlink job like this from a given 
savepoint?

Many thanks,

John

Sent from my iPhone

Re: Difference between Session Mode and Session Job(Flink Opearator)

2022-07-07 Thread bat man
Awesome, thanks!

On Thu, Jul 7, 2022 at 1:21 PM Gyula Fóra  wrote:

> Hi!
>
> The Flink Kubernetes Operator on a high level supports 3 types of
> resources:
>
>1. Session Deployment : Empty Flink Session cluster
>2. Application Deployment: Flink Application cluster (single job /
>cluster)
>3. Session Job: Flink Job deployed to an existing Session Deployment.
>
> So in other words, the Session deployment only creates the Flink cluster.
> The Session job can be deployed to an existing session deployment and it
> represents an actual Flink job.
>
> I hope that helps :)
> Gyula
>
> On Thu, Jul 7, 2022 at 7:42 AM bat man  wrote:
>
>> Hi,
>>
>> I want to understand the difference between session mode and the new
>> deployment mode - Flink Session Job which I believe is newly introduced as
>> part of the Flink Operator(1.15) release.
>> What's the benefit of using this mode as opposed to session mode as both
>> run sessions to which flink jobs can be submitted.
>>
>> Cheers.
>> H.
>>
>


Setting a timer within broadcast applyToKeyedState() (feature request)

2022-07-07 Thread James Sandys-Lumsdaine
Hello,

I know we can’t set a timer in the processBroadcastElement() of the 
KeyedBroadcastProcessFunction as there is no key. 

However, there is a context.applyToKeyedState() method which allows us to 
iterate over the keyed state in the scope of a key. So it is possible to add 
access to the TimerService onto the Context parameter passed into that 
delegate? 

Since the code running in the applyToKeyedState() method is scoped to a key we 
should be able to set up timers for that key too. 

Thanks,

James. 

Re: Difference between Session Mode and Session Job(Flink Opearator)

2022-07-07 Thread Gyula Fóra
Hi!

The Flink Kubernetes Operator on a high level supports 3 types of resources:

   1. Session Deployment : Empty Flink Session cluster
   2. Application Deployment: Flink Application cluster (single job /
   cluster)
   3. Session Job: Flink Job deployed to an existing Session Deployment.

So in other words, the Session deployment only creates the Flink cluster.
The Session job can be deployed to an existing session deployment and it
represents an actual Flink job.

I hope that helps :)
Gyula

On Thu, Jul 7, 2022 at 7:42 AM bat man  wrote:

> Hi,
>
> I want to understand the difference between session mode and the new
> deployment mode - Flink Session Job which I believe is newly introduced as
> part of the Flink Operator(1.15) release.
> What's the benefit of using this mode as opposed to session mode as both
> run sessions to which flink jobs can be submitted.
>
> Cheers.
> H.
>