[Flink K8s Operator] Automatic cleanup of terminated deployments

2023-05-14 Thread Paul Lam
Hi all,

Currently, if a job turns into terminated status (e.g. FINISHED or FAILED),
the flinkdeployment remains until a manual cleanup is performed. I went
through the docs but did not find any way to clean them up automatically.
Am I missing something? Thanks!

Best,
Paul Lam


Re: [Flink K8s Operator] Automatic cleanup of terminated deployments

2023-05-14 Thread Gyula Fóra
There is no such feature currently, Kubernetes resources usually do not
delete themselves :)
The problem I see here is by deleting the resource you lose all information
about what happened, you won't know if it failed or completed etc.
What is the use-case you are thinking about?

If this is something you think would be good to add, please open a JIRA
ticket for it. But in any case this will probably merit a dev list
discussion.

Gyula

On Sun, May 14, 2023 at 11:54 AM Paul Lam  wrote:

> Hi all,
>
> Currently, if a job turns into terminated status (e.g. FINISHED or
> FAILED), the flinkdeployment remains until a manual cleanup is performed. I
> went through the docs but did not find any way to clean them up
> automatically. Am I missing something? Thanks!
>
> Best,
> Paul Lam
>


FlinkMl

2023-05-14 Thread Danyal Awan
hello,

For my master thesis i am comparing ml frameworks on data streams.

What is the current status on FlinkML? Is distributed learning possible on
multiple nodes? If yes, how?

I played around with FlinkML a bit and modeled a simple pipeline for
sentiment analysis on tweets. For this I used the Sentiment 140 dataset
which contains 1.6 million tweets.
Unfortunately I can only use a small amount of data (about 3 samples)
for training, otherwise Taskmanager gets lost or crashes. I have also
allocated enough memory to taskmanager (JVM heap size is set to 50gb). But
training should also work with more data, right?I have also allocated
enough memory to taskmanager (JVM heap size is set to 50gb).
I have also allocated enough memory to taskmanager (JVM heap size is set to
50gb).

greetings


Re: JVM Metaspace for Task Mangers and Job Managers are not getting released.

2023-05-14 Thread Shammon FY
Hi Ajinkya,

The memory of metaspace may need to be released through FullGC, you can try
to trigger fullgc manually in JobManager and TaskManager, and check whether
the metaspace is released.

Best,
Shammon FY

On Sat, May 13, 2023 at 4:01 PM Jiadong lu  wrote:

> Hi, Ajinkya
>
> Maybe some threads in your job were not shut down when the job was closed?
>
> Best,
> Jiadong Lu
>
> On 2023/5/13 4:58, Ajinkya Pathrudkar wrote:
> > Hello,
> >
> > I am observing JVM Metaspace memory for Task Managers and Job Manager is
> > not getting released. Any thoughts?
> >
> > image.png
> >
> >
> > Thanks,
> > Ajinkya
>


(无主题)

2023-05-14 Thread 湘晗刚
Flink 1.10.0 checkpoint to hdfs failed:
Caused by:org.apache.flink.util.SerializedThrowable:
The directory item limit of .../shared is 
Exceeded:limit=1048576 items=1048576

Query on RestartPipelinedRegionFailoverStrategy

2023-05-14 Thread Prabhu Joseph
 Hi, I am testing the Flink Fine-Grained Recovery

from Task Failures on Flink 1.17 and am facing some issues where I need
some advice. Have a jobgraph below with 5 operators, and all connections
between operators are pipelined and the job's parallelism.default is set to
2. Have configured RestartPipelinedRegionFailoverStrategy with Exponential
Delay Restart Strategy.

A -> B -> C -> D -> E

There are a total of 10 tasks running. The first pipeline  (a1 to e1) runs
on a TaskManager (say TM1), and the second pipeline (a2 to e2) runs on
another TaskManager (say TM2).

a1 -> b1 -> c1 -> d1 -> e1
a2 -> b2 -> c2 -> d2 -> e2

When TM1 failed, I expected only 5 tasks (a1 to e1) would fail and they
alone would be restarted, but all 10 tasks are getting restarted. There is
only one pipeline region, which consists of all 10 execution vertices, and
RestartPipelinedRegionFailoverStrategy#getTasksNeedingRestart returns all
10 tasks. Is it the right behaviour, or could there be any issue? Is it
possible to restart only the pipeline of the failed task (a1 to e1) without
restarting other parallel pipelines.

Thanks,
Prabhu Joseph


Re: Flink Job Failure for version 1.16

2023-05-14 Thread Hangxiang Yu
Hi,
I may have missed something, So could you share more:

 I have recently migrated from 1.13.6 to 1.16.1, I can see there is a
> performance degradation...


Are you referring to a decrease in Checkpoint Performance when you mention
performance decline?
It just happens when you upgrade from 1.13.6 to 1.16.1 without any
modifications in configuration and job ?
So Could you share configuration before and after upgrading ?

Is there any issue with this Flink version or the new RocksDB version? What
> should be the action item for this Exception?
> The maximum savepoint size is 80.2 GB and we periodically(every 20
> minutes) take the savepoint for the job.
>

The version of RocksDB has been upgraded in 1.14, but it should not
increase the checkpoint size in theory.
So you found the checkpoint size has increased after upgrading ? Could you
also share some checkpoint metrics / configuration before and after
upgrading ?

On Fri, May 12, 2023 at 9:06 PM neha goyal  wrote:

> Hi Everyone, can someone please shade some light when the Checkpoint
> Coordinator is suspending Error comes and what should I do to avoid this?
> it is impacting the production pipeline after the version upgrade. It is
> related to resource crunch in the pipeline?
> Thank You
>
> On Thu, May 11, 2023 at 10:35 AM neha goyal  wrote:
>
>> I have recently migrated from 1.13.6 to 1.16.1, I can see there is a
>> performance degradation for the Flink pipeline which is using Flink's
>> managed state ListState, MapState, etc. Pipelines are frequently failing
>> with the Exception:
>>
>> 06:59:42.021 [Checkpoint Timer] WARN  o.a.f.r.c.CheckpointFailureManager
>> - Failed to trigger or complete checkpoint 36755 for job
>> d0e1a940adab2981dbe0423efe83f140. (0 consecutive failed attempts so far)
>>  org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>> org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
>> Checkpoint expired before completing.
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2165)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:750)
>> 07:18:15.257 [flink-akka.actor.default-dispatcher-31] WARN
>>  a.remote.ReliableDeliverySupervisor - Association with remote system
>> [akka.tcp://fl...@ip-172-31-73-135.ap-southeast-1.compute.internal:43367]
>> has failed, address is now gated for [50] ms. Reason: [Disassociated]
>>  akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
>> akka.remote.ReliableDeliverySupervisor07:18:15.257 [flink-metrics-23] WARN
>>  a.remote.ReliableDeliverySupervisor - Association with remote system
>> [akka.tcp://flink-metr...@ip-172-31-73-135.ap-southeast-1.compute.internal:33639]
>> has failed, address is now gated for [50] ms. Reason: [Disassociated]
>>  akka.event.slf4j.Slf4jLogger$$anonfun$receive$1
>> akka.remote.ReliableDeliverySupervisor07:18:15.331
>> [flink-akka.actor.default-dispatcher-31] WARN
>>  o.a.f.r.c.CheckpointFailureManager - Failed to trigger or complete
>> checkpoint 36756 for job d0e1a940adab2981dbe0423efe83f140. (0 consecutive
>> failed attempts so far)
>>  org.apache.flink.runtime.checkpoint.CheckpointFailureManager
>> org.apache.flink.runtime.checkpoint.CheckpointFailureManagerorg.apache.flink.runtime.checkpoint.CheckpointException:
>> Checkpoint Coordinator is suspending.
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1926)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46)
>> at
>> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1566)
>> at
>> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1161)
>>
>> Is there any issue with this Flink version or the new RocksDB version?
>> What should be the action item for this Exception?
>> The maximum savepoint size is 80.2 GB and we periodically(every 20
>> minutes) take the savepoint for the job.
>> Checkpoint Type: aligned checkpoint
>>
>

-- 
Best,
Hangxiang.


Re: JVM Metaspace for Task Mangers and Job Managers are not getting released.

2023-05-14 Thread Ajinkya Pathrudkar
Ok, I will check.

On Sun, May 14, 2023 at 9:39 PM Shammon FY  wrote:

> Hi Ajinkya,
>
> The memory of metaspace may need to be released through FullGC, you can
> try to trigger fullgc manually in JobManager and TaskManager, and check
> whether the metaspace is released.
>
> Best,
> Shammon FY
>
> On Sat, May 13, 2023 at 4:01 PM Jiadong lu  wrote:
>
>> Hi, Ajinkya
>>
>> Maybe some threads in your job were not shut down when the job was closed?
>>
>> Best,
>> Jiadong Lu
>>
>> On 2023/5/13 4:58, Ajinkya Pathrudkar wrote:
>> > Hello,
>> >
>> > I am observing JVM Metaspace memory for Task Managers and Job Manager
>> is
>> > not getting released. Any thoughts?
>> >
>> > image.png
>> >
>> >
>> > Thanks,
>> > Ajinkya
>>
> --
Thanks & Regards,
Ajinkya Pathrudkar


Re: Issues using PyFlink

2023-05-14 Thread Dian Fu
Hi Jill,

I suspect that the PyFlink isn't installed in the Python environment which
is used to run the example. Could you share the complete command you used
to execute the example: `./flink-1.17.0/bin/flink run -pyclientexec
venv/bin/python3 --python flink-1.17.0/examples/python/
datastream/word_count.py`. I think this is in-complete.

Regards,
Dian

On Fri, May 12, 2023 at 2:36 AM Jill Cardamon 
wrote:

> Hi!
>
> I'm new to Flink, and I have been trying to run a simple python flink
> script that consumes messages from Kafka as well as the examples locally
> with a few issues.
>
> 1. When I run the word count example using `./flink-1.17.0/bin/flink run
> --python flink-1.17.0/examples/python/datastream/word_count.py`, I get the
> following error:
>
> ```
> Traceback (most recent call last):
>   File
> "/Users/jill/flink-1.17.0/examples/python/datastream/word_count.py", line
> 134, in 
> word_count(known_args.input, known_args.output)
>   File
> "/Users/jill/flink-1.17.0/examples/python/datastream/word_count.py", line
> 89, in word_count
> ds = ds.flat_map(split) \
>   File
> "/Users/jill/flink-1.17.0/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
> line 354, in flat_map
>   File
> "/Users/jill/flink-1.17.0/opt/python/pyflink.zip/pyflink/datastream/data_stream.py",
> line 654, in process
>   File "", line 259, in load_module
>   File
> "/Users/jill/flink-1.17.0/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py",
> line 22, in 
> ModuleNotFoundError: No module named 'google'
> org.apache.flink.client.program.ProgramAbortException:
> java.lang.RuntimeException: Python process exits with code: 1
> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.lang.RuntimeException: Python process exits with code: 1
> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
> ... 14 more
> ```
>
> This issue happens with and without `-pyclientexec venv/bin/python3`. When
> running the example using `python word_count.py`, I get a similar error but
> the `pyflink` module is not found. I installed pyflink using `python -m pip
> install apache-flink` within a python venv (not a conda one) and downloaded
> the corresponding binary. Looking through this mailing list's archive as
> well as on Stack Overflow, I saw similar issues, and the fixes were usually
> from installing flink incorrectly. For me, Flink and its dependencies
> (including protobuf) are in
> `.../Library/Python/3.9/lib/python/site-packages/...`. I'm pretty sure I'm
> doing something silly, but I'm not quite sure what the fix is here.
>
> 2. When I run my Python script (consumes json-formatted messages from
> Kafka to a datastream and prints) using `./flink-1.17.0/bin/flink run
> --python kafka_consumer.py`, I see the same issue as the traceback above.
> When using `python kafka_consumer.py`, the program hangs on
> `env.execute()`. I see no running jobs on the dashboard. Is there a good
> way to go about debugging this? Should I be waiting a while for it to start
> running?
>
> Thanks in advance!
>
> - Jill
>
>
>
>


Re: (无主题)

2023-05-14 Thread Hangxiang Yu
Hi,
It's related to FLINK-11695
 which has not been
resolved until now.
You could increase the limit size of hdfs to alleviate this problem.
BTW, You could also share or check something before modifying the
configuration:
>From the logic of your job (how to use state), does the size and quantity
of this state data meet expectations?

On Mon, May 15, 2023 at 9:42 AM 湘晗刚 <1016465...@qq.com> wrote:

> Flink 1.10.0 checkpoint to hdfs failed:
> Caused by:org.apache.flink.util.SerializedThrowable:
> The directory item limit of .../shared is
> Exceeded:limit=1048576 items=1048576
>


-- 
Best,
Hangxiang.


How to pass the TLS certs to the latest version of flink-connector-pulsar

2023-05-14 Thread Bauddhik Anand
I am trying to connect my Flink application to a Pulsar topic for ingesting
data. The topic is active and i am able to ingest the data via a normal
Java application.

When i try to use the Flink application to ingest the data from the same
topic, using the latest version of flink-connector-pulsar i.e 4.0.0-1.17, i
do not find in the documenation anywhere how to pass to pass the TLS certs.

I tried with below code:


final StreamExecutionEnvironment envn =
StreamExecutionEnvironment.getExecutionEnvironment();

Configuration config = new Configuration();

config.setString("pulsar.client.authentication","tls");
config.setString("pulsar.client.tlsCertificateFilePath",tlsCert);
config.setString("pulsar.client.tlsKeyFilePath",tlsKey);

config.setString("pulsar.client.tlsTrustCertsFilePath",tlsTrustCert);

 PulsarSource pulsarSource = PulsarSource.builder()
.setServiceUrl("serviceurl")
.setAdminUrl("adminurl")
.setStartCursor(StartCursor.earliest())
.setTopics("topicname")
.setDeserializationSchema(new SimpleStringSchema())
.setSubscriptionName("test-sub")
.setConfig(config)
.build();


pulsarStream.map(new MapFunction() {
private static final long serialVersionUID =
-999736771747691234L;

public String map(String value) throws Exception {
  return "Receiving from Pulsar : " + value;
}
  }).print();


envn.execute();


As per documentation i did not find any inbuilt method in the PulsarSource
class to pass the TLS certs, i tried using the PulsarClient options as
config and pass it to PulsarSource as option.

This doesn't seem to work, as when i try to deploy the app, the Flink job
is submitted and JobManager throws the below error.

Caused by: sun.security.validator.ValidatorException: PKIX path
building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to
find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]
at sun.security.validator.PKIXValidator.engineValidate(Unknown
Source) ~[?:?]
at sun.security.validator.Validator.validate(Unknown Source) ~[?:?]
at sun.security.ssl.X509TrustManagerImpl.validate(Unknown Source) ~[?:?]


Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
unable to find valid certification path to requested target
at sun.security.provider.certpath.SunCertPathBuilder.build(Unknown
Source) ~[?:?]
at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(Unknown
Source) ~[?:?]
at java.security.cert.CertPathBuilder.build(Unknown Source) ~[?:?]
at sun.security.validator.PKIXValidator.doBuild(Unknown Source) ~[?:?]

I have already verified the certs path and it is correct, also i am using
the same path as a volume mount for my other apps and they work fine.

My question is :

How i can pass the certs to the latest version of the
*flink-connector-pulsar* i.e *4.0.0-1.17*


Re: JVM Metaspace for Task Mangers and Job Managers are not getting released.

2023-05-14 Thread Shammon FY
Hi Ajinkya,

The command 'jcmd  GC.run' can be used to trigger fullgc for JVM
process. However, it should be noted that this may have a performance
impact on the ongoing computation.

Best,
Shammon FY


On Mon, May 15, 2023 at 10:51 AM Ajinkya Pathrudkar <
ajinkya.pathrudka...@gmail.com> wrote:

> Hi Shammon,
>
> When you say FullGC means are you referring to call system.gc()? And one
> more thing I am struggling to find is how I can call fullGC for JobManger
> and TaskManager?
>
> Thanks,
> Ajinkya
>
> On Sun, May 14, 2023 at 10:40 PM Ajinkya Pathrudkar <
> ajinkya.pathrudka...@gmail.com> wrote:
>
>> Ok, I will check.
>>
>> On Sun, May 14, 2023 at 9:39 PM Shammon FY  wrote:
>>
>>> Hi Ajinkya,
>>>
>>> The memory of metaspace may need to be released through FullGC, you can
>>> try to trigger fullgc manually in JobManager and TaskManager, and check
>>> whether the metaspace is released.
>>>
>>> Best,
>>> Shammon FY
>>>
>>> On Sat, May 13, 2023 at 4:01 PM Jiadong lu  wrote:
>>>
 Hi, Ajinkya

 Maybe some threads in your job were not shut down when the job was
 closed?

 Best,
 Jiadong Lu

 On 2023/5/13 4:58, Ajinkya Pathrudkar wrote:
 > Hello,
 >
 > I am observing JVM Metaspace memory for Task Managers and Job Manager
 is
 > not getting released. Any thoughts?
 >
 > image.png
 >
 >
 > Thanks,
 > Ajinkya

>>> --
>> Thanks & Regards,
>> Ajinkya Pathrudkar
>>
> --
> Thanks & Regards,
> Ajinkya Pathrudkar
>