RE: Helper methods for catching unexpected key changes?

2021-10-07 Thread Schwalbe Matthias
Good morning Dan,

Being short of information on how you arranged your job, I can only make 
general comments:

ReinterpretAsKeyedStream only applies to data streams that are in fact 
partitioned by the same key, i.e. your job would look somewhat like this:

DataStreamUtils.reinterpretAsKeyedStream(
Stream
.keyBy(keyExtractor1)
.process(keyedProcessFunction1)//or any of the other keyed operators
,keyExtractor2 …
)
.process(keyedProcessFunction2) //or any of the other keyed operators

keyExtractor1 and keyExtractor2 need to come to the same result for related 
events (input/output of keyedProcessFuntion1 resp.)

I assume your exception happens in keyedProcessFunction2?

reinterpretAsKeyedStream makes sense if you want to chain keyedProcessFunction1 
and keyedProcessFunction2, otherwise keyBy() will do …

I hope these hints help, otherwise feel free to get back to the mailing list 
with a more detailed description of your arrangement 😊

Cheers

Thias





From: Dan Hill 
Sent: Freitag, 8. Oktober 2021 06:49
To: user 
Subject: Helper methods for catching unexpected key changes?

Hi.  I'm getting the following errors when using reinterpretAsKeyedStream.  I 
don't expect the key to change for rows in reinterpretAsKeyedStream.  Are there 
any utilities that I can use that I can use with reinterpetAsKeyedStream to 
verify that the key doesn't change?  E.g. some wrapper operator?



2021-10-02 16:38:46
java.lang.IllegalArgumentException: key group from 154 to 156 does not contain 
213
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
at 
org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Helper methods for catching unexpected key changes?

2021-10-07 Thread Dan Hill
Hi.  I'm getting the following errors when using reinterpretAsKeyedStream.
I don't expect the key to change for rows in reinterpretAsKeyedStream.  Are
there any utilities that I can use that I can use with
reinterpetAsKeyedStream to verify that the key doesn't change?  E.g. some
wrapper operator?



2021-10-02 16:38:46
java.lang.IllegalArgumentException: key group from 154 to 156 does not
contain 213
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
at
org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)


Re: Kubernetes HA - Reusing storage dir for different clusters

2021-10-07 Thread Yang Wang
When the Flink job reached to global terminal state(FAILED, CANCELED,
FINISHED), all the HA related data(including pointers in ConfigMap and
concrete data in DFS) will be cleaned up automatically.

Best,
Yang

Alexis Sarda-Espinosa  于2021年10月4日周一
下午3:59写道:

> Hello,
>
>
>
> If I deploy a Flink-Kubernetes application with HA, I need to set
> high-availability.storageDir. If my application is a batch job that may run
> multiple times with the same configuration, do I need to manually clean up
> the storage dir between each execution?
>
>
>
> Regards,
>
> Alexis.
>
>
>


Re: Can BroadcastProcessFunction invoke both methods concurrently?

2021-10-07 Thread Caizhi Weng
Hi!

Just like what you said, they won't be invoked concurrently. Flink is using
the actor model in runtime so methods in operators won't be called at the
same time.

By the caching layer I suppose you would like to store the broadcast
messages into the java map for some time and periodically stores them into
state, instead of visiting the state for every message? This is OK and the
SQL API in Flink also has such built-in optimization. If you're interested,
see the mini-batch optimization[1] for details.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation

Trystan  于2021年10月7日周四 上午3:59写道:

> I couldn't find any explicit documentation on whether a broadcast operator
> might invoke processElement and processBroadcastElement concurrently. At
> first I suspected it can, hence the different Contexts (read-write,
> read-only). But the TwoInputStreamOperator
> 
>  -
> which I think the Broadcast operator utilizes? - explicitly states that the
> methods will not be called concurrently.
>
> For context, I am considering implementing essentially a caching layer on
> top of the broadcast state. The main motivation is to avoid the
> deserialization overhead when accessing the state. The broadcast objects
> are large, so even making them pure POJO (which may be a significant
> undertaking) could still be painful. The broadcast objects are also
> guaranteed to come in a single message (a single List) every so
> often. We currently clear the Map state, and fully
> repopulate it on every broadcast. I would like to do the same with a purely
> local / java Map but want to be sure I don't run into any
> races.
>
> I also recognize that this is very close to playing with fire, and is
> exactly why we have a broadcast state where Flink can hide all the danger,
> so would be open to other ideas if this is cardinal sin!
>
> Trystan
>


Re: jdbc connector configuration

2021-10-07 Thread Caizhi Weng
Hi!

These configurations are not required to merely read from a database. They
are here to accelerate the reads by allowing sources to read data in
parallel.

This optimization works by dividing the data into several
(scan.partition.num) partitions and each partition will be read by a task
slot (not a task manager, as a task manager may have multiple task slots).
You can set scan.partition.column to specify the partition key and also set
the lower and upper bounds for the range of data.

Let's say your partition key is the column "k" which ranges from 0 to 999.
If you set the lower bound to 0, the upper bound to 999 and the number of
partitions to 10, then all data satisfying 0 <= k < 100 will be divided
into the first partition and read by the first task slot, all 100 <= k <
200 will be divided into the second partition and read by the second task
slot and so on. So these configurations should have nothing to do with the
number of rows you have, but should be related to the range of your
partition key.

Qihua Yang  于2021年10月7日周四 上午7:43写道:

> Hi,
>
> I am trying to read data from database with JDBC driver. From [1], I have
> to config below parameters. I am not quite sure if I understand it
> correctly. lower-bound is smallest value of the first partition,
> upper-bound is largest value of the last partition. For example, if the db
> table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that correct?
> If  setting scan.partition.num to 10, each partition read 100 row?
> if I set scan.partition.num to 10 and I have 10 task managers. Each task
> manager will pick a partition to read?
>
>- scan.partition.column: The column name used for partitioning the
>input.
>- scan.partition.num: The number of partitions.
>- scan.partition.lower-bound: The smallest value of the first
>partition.
>- scan.partition.upper-bound: The largest value of the last partition.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/
>
> Thanks,
> Qihua
>


Re: Exceeded Checkpoint tolerable failure threshold Exception

2021-10-07 Thread Caizhi Weng
Hi!

You need to look into the root cause of checkpoint failure. You can see the
"Checkpoint" tab to see if checkpointing timeout occurs or see the
"Exception" tab for exception messages other than this one. You can also
dive into the logs for suspicious information.

If checkpoint failures are rare and you would like to allow them,
set execution.checkpointing.tolerable-failed-checkpoints to the number of
checkpoints you would like to tolerate. For documentation see
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-tolerable-failed-checkpoints

Robert Cullen  于2021年10月8日周五 上午12:49写道:

> I have Flink set up with 2 taskmanagers and one jobmanager. I've allocated
> 25 gb of JVM Heap and 15 gb of  Flink managed memory.  I have 2 jobs
> running.  After 3 hours this exception was thrown.  How can I configure
> flink to prevent this from happening?
>
> 2021-10-07 12:38:50
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
> tolerable failure threshold.
> at org.apache.flink.runtime.checkpoint.CheckpointFailureManager
> .handleCheckpointException(CheckpointFailureManager.java:98)
> at org.apache.flink.runtime.checkpoint.CheckpointFailureManager
> .handleJobLevelCheckpointException(CheckpointFailureManager.java:67)
> at org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> .abortPendingCheckpoint(CheckpointCoordinator.java:1934)
> at org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> .abortPendingCheckpoint(CheckpointCoordinator.java:1906)
> at org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> .access$600(CheckpointCoordinator.java:96)
> at org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:
> 1990)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
> 511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.
> ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(
> ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.
> ScheduledThreadPoolExecutor$ScheduledFutureTask.run(
> ScheduledThreadPoolExecutor.java:293)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> --
> Robert Cullen
> 240-475-4490
>


Re: Pyflik job data stream to table conversion declareManagedMemory exception

2021-10-07 Thread Dian Fu
Hi Kamil,

I have checked that this method exists in 1.12.3:
https://github.com/apache/flink/blob/release-1.12.3/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java#L137

Could you double check whether the Flink version is 1.12.3 (not just the
PyFlink version)?

Regards,
Dian



On Tue, Oct 5, 2021 at 11:34 PM Nicolaus Weidner <
nicolaus.weid...@ververica.com> wrote:

> Hi Kamil,
>
> On Tue, Oct 5, 2021 at 9:03 AM Kamil ty  wrote:
>
>> Hello,
>>
>> I'm trying to run a pyflink job in cluster mode (with yarn). My job
>> contains source and sink definitions using Table API which are converted to
>> a datastream and back. Unfortunately I'm getting an unusual exception at:
>> *table = t_env.from_data_stream(ds, 'user_id, first_name, last_name).*
>>
>
> Just to make sure: Is the missing quotation mark just a typo in your mail,
> or your code (right before the closing bracket)?
> *table = t_env.from_data_stream(ds, 'user_id, first_name, last_name['])*
>
> Best regards,
> Nico
>


Re: How to create backpressure with a Statefun remote function?

2021-10-07 Thread Igal Shilman
Hello Christian,

The challenge with generic back pressure and remote functions, is that
StateFun doesn't know if it targets a single process or a fleet of
processes behind a load balancer and an autoscaler.
Triggering back pressure too early might never kick in the autoscaling.

Indeed that parameter you have found will trigger back pressure when the
total number of requests per task slot exceeds that value. There is an
additional param that will trigger back pressure per function address.
This is called: maxNumBatchRequests
And is more fine-grained than the per-task slot parameter. Reducing this
value might be recommend if the total processing time of a single message
is potentially high (CPU intensive/ or a long IO)

I think that this use case is valid, and we need to think about the case
where the set of remote functions is static (up to a manual scale up)
I don't have a good idea at the moment as deciding to rather to back
pressure or not requires some sort of a global knowledge.

What I would recommend is, if it fits your infra, is to consider an auto
scaler for the remote functions according to a metric that makes sense to
you, and use the max-in-flight parameter as a high safety net.

Cheers,
Igal

On Thu 7. Oct 2021 at 14:03, Christian Krudewig (Corporate Development) <
christian.krude...@dpdhl.com> wrote:

> Hello fellow Flink users,
>
> How do you create backpressure with Statefun remote functions? I'm using an
> asynchronous web server for the remote function (Python aiohttp on uvicorn)
> which accepts more requests than its CPU bound backend can handle. That can
> make the requests time out and can trigger a restart loop of the whole
> Flink
> pipeline. Of course only in situations where so many requests are coming
> into the ingress kafka stream that the service cannot handle it anymore.
>
> Desired behavior: Flink only consumes as many records from the input stream
> as the pipeline can handle instead of overloading the remote functions.
>
> What I tried so far:
> 1. Set "statefun.async.max-per-task" in flink.conf to a low number. This
> works. But that is one global static config for all function which cannot
> be
> changed without restarting the cluster when the remote functions are scaled
> up or down.
> 2. Add concurrency limiting to the remote function service. If the function
> service returns failure codes (500, 503, 429) that doesn't seem to create
> backpressure but is handled like a normal failure by flink with retries
> until finally the whole pipeline gets restarted.
> 3. Try the new "io.statefun.transports.v1/async" transport type for the
> endpoints with a low "pool_size" parameter. But reaching the pool size
> seems
> to be treated like an error instead of creating backpressure. Same effect
> as
> option 2.
>
> Is there some other option? How should it be done by design?
>
> Thanks,
>
> Christian
>
>
>


Exceeded Checkpoint tolerable failure threshold Exception

2021-10-07 Thread Robert Cullen
I have Flink set up with 2 taskmanagers and one jobmanager. I've allocated
25 gb of JVM Heap and 15 gb of  Flink managed memory.  I have 2 jobs
running.  After 3 hours this exception was thrown.  How can I configure
flink to prevent this from happening?

2021-10-07 12:38:50
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager
.handleCheckpointException(CheckpointFailureManager.java:98)
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager
.handleJobLevelCheckpointException(CheckpointFailureManager.java:67)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator
.abortPendingCheckpoint(CheckpointCoordinator.java:1934)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator
.abortPendingCheckpoint(CheckpointCoordinator.java:1906)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(
CheckpointCoordinator.java:96)
at org.apache.flink.runtime.checkpoint.
CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:
1990)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:624)
at java.lang.Thread.run(Thread.java:748)

-- 
Robert Cullen
240-475-4490


RE: FlinkJobNotFoundException

2021-10-07 Thread Gusick, Doug S
Hi Matthias,

I just wanted to follow up here. Were you able to access the jobmanager log? If 
so, were you able to find anything around the issues we have been facing?

Best,
Doug

From: Hailu, Andreas [Engineering] 
Sent: Thursday, September 30, 2021 8:56 AM
To: Matthias Pohl ; Gusick, Doug S [Engineering] 

Cc: user@flink.apache.org; Erai, Rahul [Engineering] 

Subject: RE: FlinkJobNotFoundException

Hi Matthias, the log file is quite large (21MB) so mailing it over in its 
entirety may have been a challenge. The file is available here [1], and we’re 
of course happy to share any relevant parts of it with the mailing list.

I think since we’ve shared logs with you before in the past, you weren’t sent 
over an additional welcome email ☺


[1] https://lockbox.gs.com/lockbox/folders/dc2ccacc-f2d2-4d66-a098-461b43e8b65f/

// ah

From: Matthias Pohl mailto:matth...@ververica.com>>
Sent: Thursday, September 30, 2021 2:57 AM
To: Gusick, Doug S [Engineering] 
mailto:doug.gus...@ny.email.gs.com>>
Cc: user@flink.apache.org; Erai, Rahul 
[Engineering] mailto:rahul.e...@ny.email.gs.com>>
Subject: Re: FlinkJobNotFoundException

I didn't receive any email. But we rather not do individual support. Please 
share the logs on the mailing list. This way, anyone is able to participate in 
the discussion.

Best,
Matthias

On Wed, Sep 29, 2021 at 8:12 PM Gusick, Doug S 
mailto:doug.gus...@gs.com>> wrote:
Hi Matthias,

Thank you for getting back. We have been looking into upgrading to a newer 
version, but have not completed full testing just yet.

I was unable to find a previous error in the JM logs. You should have received 
an email with details to a “lockbox”. I have uploaded the job manager logs 
there. Please let me know if you need any more information.

Thank you,
Doug

From: Matthias Pohl mailto:matth...@ververica.com>>
Sent: Wednesday, September 29, 2021 12:00 PM
To: Gusick, Doug S [Engineering] 
mailto:doug.gus...@ny.email.gs.com>>
Cc: user@flink.apache.org; Erai, Rahul 
[Engineering] mailto:rahul.e...@ny.email.gs.com>>
Subject: Re: FlinkJobNotFoundException

Hi Doug,
thanks for reaching out to the community. First of all, 1.9.2 is quite an old 
Flink version. You might want to consider upgrading to a newer version. The 
community only offers support for the two most-recent Flink versions. Newer 
version might include fixes for your issue.

But back to your actual problem: The logs you're providing only show that some 
job switched into FINISHED state. Is there some error showing up earlier in the 
logs which you might have missed? It would be helpful if you could share the 
complete JobManager logs to get a better understanding of what's going on.

Best,
Matthias

On Wed, Sep 29, 2021 at 3:47 PM Gusick, Doug S 
mailto:doug.gus...@gs.com>> wrote:
Hello,

We are facing an issue with some of our applications that are submitting a high 
volume of jobs to Flink (we are using v1.9.2). We are observing that numerous 
jobs (in this case 44 out of 350+) fail with the same FlinkJobNotFoundException 
within a 45 second timeframe.

From our client logs, this is the exception we can see:


Calc Engine: Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (d0991f0ae712a9df710aa03311a32c8c)]

Calc Engine:   at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)

Calc Engine:   at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)

Calc Engine:   at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)

Calc Engine:   at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)

Calc Engine:   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

Calc Engine:   ... 3 more


This is the first job to fail with the above exception. From the JobManager 
logs, we can see that the job goes to FINISHED State, and then we see the 
following exception:

2021-09-28 04:54:16,936 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink Java 
Job at Tue Sep 28 04:48:21 EDT 2021 (d0991f0ae712a9df710aa03311a32c8c) switched 
from state RUNNING to FINISHED.
2021-09-28 04:54:16,937 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
d0991f0ae712a9df710aa03311a32c8c reached globally terminal state FINISHED.
2021-09-28 04:54:16,939 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.jobmaster.JobMaster  - Stopping the 
JobMaster for job Flink Java Job at Tue Sep 28 04:48:21 EDT 
2021(d0991f0ae712a9df710aa03311a32c8c).
2021-09-28 04:54:16,940 INFO  [flink-akka.actor.default-dispatcher-39] 
org.apache.flink.yarn.YarnResourceManager - Disconnect job 
manager 
0

How to create backpressure with a Statefun remote function?

2021-10-07 Thread Christian Krudewig (Corporate Development)
Hello fellow Flink users,

How do you create backpressure with Statefun remote functions? I'm using an
asynchronous web server for the remote function (Python aiohttp on uvicorn)
which accepts more requests than its CPU bound backend can handle. That can
make the requests time out and can trigger a restart loop of the whole Flink
pipeline. Of course only in situations where so many requests are coming
into the ingress kafka stream that the service cannot handle it anymore.

Desired behavior: Flink only consumes as many records from the input stream
as the pipeline can handle instead of overloading the remote functions.

What I tried so far:
1. Set "statefun.async.max-per-task" in flink.conf to a low number. This
works. But that is one global static config for all function which cannot be
changed without restarting the cluster when the remote functions are scaled
up or down.
2. Add concurrency limiting to the remote function service. If the function
service returns failure codes (500, 503, 429) that doesn't seem to create
backpressure but is handled like a normal failure by flink with retries
until finally the whole pipeline gets restarted.
3. Try the new "io.statefun.transports.v1/async" transport type for the
endpoints with a low "pool_size" parameter. But reaching the pool size seems
to be treated like an error instead of creating backpressure. Same effect as
option 2.

Is there some other option? How should it be done by design?

Thanks,

Christian




smime.p7s
Description: S/MIME cryptographic signature


AW: Deploying python statefun program on standalone Flink cluster

2021-10-07 Thread Christian Krudewig (Corporate Development)
Hello Le,



The whole charm of statefun from my point of view comes with the remote 
functions. Especially on kubernetes it gives you the option to scale and 
deploy the remote function with the core logic independent of the flink 
worker/manager.



Examples are in the playground repository: 
https://github.com/apache/flink-statefun-playground/tree/release-3.1



Best,



Christian





Von: Igal Shilman 
Gesendet: Donnerstag, 7. Oktober 2021 11:20
An: Le Xu 
Cc: user 
Betreff: Re: Deploying python statefun program on standalone Flink cluster



Hello Le,



Currently the only way to execute a Python function with StateFun is through a 
remote function.

This means that you need to host the function separately. [1]



Good luck!

Igal



[1] 
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/http-endpoint/



On Thu, Oct 7, 2021 at 1:19 AM Le Xu mailto:sharonx...@gmail.com> > wrote:

Hello!



I was wondering if there is an example to deploy Python statefun program 
directly on standalone cluster (or if this is supported at all). Right now I 
found an early write up here 

 
saying that in order to deploy a java project we can simply add statefun 
dependency to the project. If I have my on standalone Flink cluster, is there 
a way to run a python program directly on top of it?



Thanks!



Le



smime.p7s
Description: S/MIME cryptographic signature


Re: Deploying python statefun program on standalone Flink cluster

2021-10-07 Thread Igal Shilman
Hello Le,

Currently the only way to execute a Python function with StateFun is
through a remote function.
This means that you need to host the function separately. [1]

Good luck!
Igal

[1]
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/http-endpoint/

On Thu, Oct 7, 2021 at 1:19 AM Le Xu  wrote:

> Hello!
>
> I was wondering if there is an example to deploy Python statefun program
> directly on standalone cluster (or if this is supported at all). Right now
> I found an early write up here
> 
> saying that in order to deploy a java project we can simply add statefun
> dependency to the project. If I have my on standalone Flink cluster, is
> there a way to run a python program directly on top of it?
>
> Thanks!
>
> Le
>