Re: FlinkKafkaConsumer and FlinkKafkaProducer and Kafka Cluster Migration

2022-01-17 Thread Martijn Visser
Hi Alexey,

Just so you know, this feature most likely won't make it to 1.15
unfortunately.

Best regards,

Martijn

On Mon, 17 Jan 2022 at 22:47, Alexey Trenikhun  wrote:

> Thank you Fabian.
>
> We are waiting for FLINK-18450
>  (watermark alignment)
> before switching to KafkaSource, currently we use extra logic on top of
> FlinkKafkaConsumer to support watermark alignment.
>
> Thanks,
> Alexey
> [FLINK-18450] Add watermark alignment logic to SourceReaderBase. - ASF
> JIRA - issues.apache.org
> 
> With the per-split watermark support, SourceReaderBase should be able to
> perform watermark alignment so that all the connectors that inherit from it
> would benefit.
> issues.apache.org
>
>
> --
> *From:* Fabian Paul 
> *Sent:* Friday, January 14, 2022 4:02 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: FlinkKafkaConsumer and FlinkKafkaProducer and Kafka
> Cluster Migration
>
> Hi Alexey,
>
> The bootstrap servers are not part of the state so you are good to go
> although please stop all your jobs with a savepoint and resume from it
> with the new properties.
> I guess to migrate the FlinkKafkaConsumer to an empty topic you can
> discard the state if you ensure that all messages beginning from the
> latest checkpointed offset are in the new topic.
>
> Please be aware that we deprecated the FlinkKafkaConsumer and
> FlinkKafkaProducer with Flink 1.14 in favor of the KafkaSource and
> KafkaSink. In the future, we plan to remove both and they will not
> receive further updates.
>
> Best,
> Fabian
>
> On Fri, Jan 14, 2022 at 3:08 AM Alexey Trenikhun  wrote:
> >
> > Hello,
> >
> > Currently we are using FlinkKafkaConsumer and FlinkKafkaProducer and
> planning to migrate to different Kafka cluster. Are boostrap servers,
> username and passwords part of FlinkKafkaConsumer and FlinkKafkaProducer ?
> So if we take savepoint  change boostrap server and credentials and start
> job from savepoint, will it use new connection properties and old one from
> savepoint?
> > Assuming that we connected to new Kafka cluster, I think that
> FlinkKafkaConsumer offsets will be reset, because new Kafka cluster will be
> empty and FlinkKafkaConsumer will not be able to seek to stored offsets, am
> I right?
> >
> > Thanks,
> > Alexey
>


Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored and falls back to default /opt/flink/conf/log4j-console.properties

2022-01-17 Thread Yang Wang
I think you are right. Before 1.13.0, if the log configuration file does
not exist, the logging properties would not be added to the start command.
That is why it could work in 1.12.2.

However, from 1.13.0, we are not using
"kubernetes.container-start-command-template" to generate the JM/TM start
command, but the jobmanager.sh/taskmanager.sh. We do not
have the same logic in the "flink-console.sh".

Maybe we could introduce an environment for log configuration file name in
the "flink-console.sh". The default value could be
"log4j-console.properties" and it could be configured by users.
If this makes sense to you, could you please create a ticket?


Best,
Yang

Tamir Sagi  于2022年1月17日周一 22:53写道:

> Hey Yang,
>
> thanks for answering,
>
> TL;DR
>
> Assuming I have not missed anything , the way TM and JM are created is
> different between these 2 versions,
> but it does look like flink-console.sh gets called eventually with the
> same exec command.
>
> in 1.12.2 if org.apache.flink.kubernetes.kubeclient.parameters#hasLog4j
> returns false then logging args are not added to startCommand.
>
>
>1. why does the config dir gets cleaned once the cluster starts? Even
>when I pushed log4j-console.properties to the expected location
>(/opt/flink/conf) , the directory includes only flink-conf.yaml.
>2. I think by running exec command "...${FLINK_ENV_JAVA_OPTS}
>"${log_setting[@]}" "${ARGS[@]}" some properties might be ignored.
>IMO, it should first look for properties in java.opts provided by the
>user in flink-conf and falls back to default in case it's not present.
>
>
> Taking about Native kubernetes mode
>
> I checked the bash script in flink-dist module, it looks like in both
> 1.14.2 and 1.12.2. flink-console.sh is similar. (in 1.14.2 there are more
> cases for the input argument)
>
> logging variable is the same
>
> https://github.com/apache/flink/blob/release-1.14.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L101
>
> https://github.com/apache/flink/blob/release-1.12.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L89
>
> Exec command is the same
>
> https://github.com/apache/flink/blob/release-1.14.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L114
>
> https://github.com/apache/flink/blob/release-1.12.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L99
>
> As for creating TM/JM, in *1.14.2* there is a usage of 2 bash scripts
>
>- kubernetes-jobmanager.sh
>- kubernetes-taskmanager.sh
>
> They get called while decorating the pod, referenced in startCommand.
>
> for instance, JobManager.
>
> https://github.com/apache/flink/blob/release-1.14.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdJobManagerDecorator.java#L58-L59
>
> kubernetes-jobmanager.sh gets called once the container starts which calls
> flink-console.sh internally and pass the
> deploymentName(kubernetes-application in our case) and args.
>
> In *1.12.2* the decorator set /docker-entrypoint.sh
>
> https://github.com/apache/flink/blob/release-1.12.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java#L67
>
> and set the start command
>
> https://github.com/apache/flink/blob/release-1.12.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java#L224
>
>
> https://github.com/apache/flink/blob/release-1.12.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java#L333
>
>
> with additional logging parameter
>
> https://github.com/apache/flink/blob/release-1.12.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
> #L421-L425
> 
>
> hasLog4j
>
> https://github.com/apache/flink/blob/release-1.12.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java#L151-L155
> it checks if the file exists in conf dir.
>
> If the log4j is false, then the logging properties are not added to start
> command(Might be the case, which explains why it works in 1.12.2)
>
> It then passes 'jobmanager' as component.
> looking into /docker-entrypoint.sh it calls jobmanager.sh which calls
> flink-console.sh internally
>
> Have I missed anything?
>
>
> Best,
> Tamir
>
>
> --
> *From:* Yang Wang 
> *Sent:* Monday, January 17, 2022 1:05 PM
> *To:* Tamir Sagi 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored
> and falls back to default /opt/flink/conf/log4j-console.properties
>
>
> *EXTERNAL EMAIL*
>
>
> I think the root cause is that we are using "flink-console.sh" to start
> the JobManager/TaskManager process for native K8s integration after
> FLINK-21128[1].
> So it forces the log4j configuration nam

Re: Examples / Documentation for Flink ML 2

2022-01-17 Thread Dong Lin
Hi Bonino,

Thanks for your interest!

Flink ML is currently ready for experienced algorithm developers to try it
out because we have setup the basic APIs and infrastructure to develop
algorithms. Five algorithms (i.e. kmeans, naive bays, knn, logistic
regression and one-hot encoder) has been implemented in the last release.
Their unit tests can be found here
,
here

and here
,
which show how to use these algorithms (including transform/fit/save/load).
And from these unit tests you can find implementation of these algorithms
which can be used as reference implementation to develop other algorithms
of your interest.

We plan to setup a website for Flink ML to provide links to
example/tutorial similar to the Flink Statefun website (link
). This
website will likely be setup in March. We are currently working on
developing further infrastructure for benchmarking and optimizing the
machine learning algorithms in Flink ML.

Best Regards,
Dong



On Mon, Jan 17, 2022 at 8:57 PM Dawid Wysakowicz 
wrote:

> I am adding a couple of people who worked on it. Hopefully, they will be
> able to answer you.
> On 17/01/2022 13:39, Bonino Dario wrote:
>
> Dear List,
>
> We are in the process of evaluating Flink ML version 2.0 in the context of
> some ML task mainly concerned with classification and clustering.
>
> While algorithms for this 2 domains are already present, although in a
> limited form (perhaps) in the latest release of Flink ML, we did not found
> any example / documentation that could guide our experiments.
>
> Is some adoption example available, like code, tutorial or any information
> that might help us in bootstrapping a Flink ML 2 project?
>
> Thank you very much
>
> Best regards
>
> --
> Ing. Dario Bonino, Ph.D
>
> e-m@il: dario.bon...@gmail.com
> www: https://www.linkedin.com/in/dariobonino
> 
>   Dario
>   Bonino
>   slide...@hotmail.com
> 
>
>


Flink Kinesis connector - EFO connection error with http proxy settings

2022-01-17 Thread Gnanamoorthy, Saravanan
Hello,
We are using Flink kinesis connector for processing the streaming data from 
kinesis. We are running the application behind the proxy. After the proxyhost 
and proxyport settings, the Connector works with default publisher 
type(Polling) but it doesn’t work when we enable the publisher type as Enhanced 
fanout (EFO). We tried with different connector version but it the behaviours 
is same. I am wondering if the proxy settings are ignored for EFO type. I am 
looking forward to your feedback/recommendations.

Flink version: 1.3.5
Java version: 11

Here is the error log:

2022-01-17 18:59:20,707 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 
(fbb512e099d031470403965ba1830e8c) switched from RUNNING to FAILED with failure 
cause: 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil$FlinkKinesisStreamConsumerRegistrarException:
 Error registering stream: a367945-consumer-stream-dit

at 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:125)

at 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:106)

at 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.lazilyRegisterStreamConsumers(StreamConsumerRegistrarUtil.java:75)

at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.(KinesisDataFetcher.java:429)

at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.(KinesisDataFetcher.java:365)

at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.createFetcher(FlinkKinesisConsumer.java:536)

at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:308)

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)

at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)

at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

Suppressed: java.lang.NullPointerException

at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)

at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756)

at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)

at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)

at java.base/java.lang.Thread.run(Thread.java:834)

Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.exception.SdkClientException:
 Unable to execute HTTP request: Network is unreachable: 
kinesis.us-east-1.amazonaws.com/3.227.250.203:443

at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)

at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)

at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.lambda$describeStreamSummary$0(KinesisProxyV2.java:101)

at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.invokeWithRetryAndBackoff(KinesisProxyV2.java:191)

at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.describeStreamSummary(KinesisProxyV2.java:100)

at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.StreamConsumerRegistrar.registerStreamConsumer(StreamConsumerRegistrar.java:90)

at 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil.registerStreamConsumers(StreamConsumerRegistrarUtil.java:122)

... 9 more



Thanks
-Saravan


Re: FlinkKafkaConsumer and FlinkKafkaProducer and Kafka Cluster Migration

2022-01-17 Thread Alexey Trenikhun
Thank you Fabian.

We are waiting for 
FLINK-18450 (watermark 
alignment) before switching to KafkaSource, currently we use extra logic on top 
of FlinkKafkaConsumer to support watermark alignment.

Thanks,
Alexey
[FLINK-18450] Add watermark alignment logic to SourceReaderBase. - ASF JIRA - 
issues.apache.org
With the per-split watermark support, SourceReaderBase should be able to 
perform watermark alignment so that all the connectors that inherit from it 
would benefit.
issues.apache.org



From: Fabian Paul 
Sent: Friday, January 14, 2022 4:02 AM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: FlinkKafkaConsumer and FlinkKafkaProducer and Kafka Cluster 
Migration

Hi Alexey,

The bootstrap servers are not part of the state so you are good to go
although please stop all your jobs with a savepoint and resume from it
with the new properties.
I guess to migrate the FlinkKafkaConsumer to an empty topic you can
discard the state if you ensure that all messages beginning from the
latest checkpointed offset are in the new topic.

Please be aware that we deprecated the FlinkKafkaConsumer and
FlinkKafkaProducer with Flink 1.14 in favor of the KafkaSource and
KafkaSink. In the future, we plan to remove both and they will not
receive further updates.

Best,
Fabian

On Fri, Jan 14, 2022 at 3:08 AM Alexey Trenikhun  wrote:
>
> Hello,
>
> Currently we are using FlinkKafkaConsumer and FlinkKafkaProducer and planning 
> to migrate to different Kafka cluster. Are boostrap servers, username and 
> passwords part of FlinkKafkaConsumer and FlinkKafkaProducer ? So if we take 
> savepoint  change boostrap server and credentials and start job from 
> savepoint, will it use new connection properties and old one from savepoint?
> Assuming that we connected to new Kafka cluster, I think that 
> FlinkKafkaConsumer offsets will be reset, because new Kafka cluster will be 
> empty and FlinkKafkaConsumer will not be able to seek to stored offsets, am I 
> right?
>
> Thanks,
> Alexey


Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored and falls back to default /opt/flink/conf/log4j-console.properties

2022-01-17 Thread Tamir Sagi
Hey Yang,

thanks for answering,

TL;DR

Assuming I have not missed anything , the way TM and JM are created is 
different between these 2 versions,
but it does look like flink-console.sh gets called eventually with the same 
exec command.

in 1.12.2 if org.apache.flink.kubernetes.kubeclient.parameters#hasLog4j returns 
false then logging args are not added to startCommand.


  1.  why does the config dir gets cleaned once the cluster starts? Even when I 
pushed log4j-console.properties to the expected location (/opt/flink/conf) , 
the directory includes only flink-conf.yaml.
  2.  I think by running exec command "...${FLINK_ENV_JAVA_OPTS} 
"${log_setting[@]}" "${ARGS[@]}" some properties might be ignored.
IMO, it should first look for properties in java.opts provided by the user in 
flink-conf and falls back to default in case it's not present.

Taking about Native kubernetes mode

I checked the bash script in flink-dist module, it looks like in both 1.14.2 
and 1.12.2. flink-console.sh is similar. (in 1.14.2 there are more cases for 
the input argument)

logging variable is the same
https://github.com/apache/flink/blob/release-1.14.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L101
https://github.com/apache/flink/blob/release-1.12.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L89

Exec command is the same
https://github.com/apache/flink/blob/release-1.14.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L114
https://github.com/apache/flink/blob/release-1.12.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L99

As for creating TM/JM, in 1.14.2 there is a usage of 2 bash scripts

  *   kubernetes-jobmanager.sh
  *   kubernetes-taskmanager.sh

They get called while decorating the pod, referenced in startCommand.

for instance, JobManager.
https://github.com/apache/flink/blob/release-1.14.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdJobManagerDecorator.java#L58-L59

kubernetes-jobmanager.sh gets called once the container starts which calls 
flink-console.sh internally and pass the deploymentName(kubernetes-application 
in our case) and args.

In 1.12.2 the decorator set /docker-entrypoint.sh
https://github.com/apache/flink/blob/release-1.12.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java#L67

and set the start command
https://github.com/apache/flink/blob/release-1.12.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java#L224

https://github.com/apache/flink/blob/release-1.12.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java#L333


with additional logging parameter
https://github.com/apache/flink/blob/release-1.12.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java#L421-L425

hasLog4j
https://github.com/apache/flink/blob/release-1.12.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java#L151-L155
it checks if the file exists in conf dir.

If the log4j is false, then the logging properties are not added to start 
command(Might be the case, which explains why it works in 1.12.2)

It then passes 'jobmanager' as component.
looking into /docker-entrypoint.sh it calls jobmanager.sh which calls 
flink-console.sh internally

Have I missed anything?


Best,
Tamir




From: Yang Wang 
Sent: Monday, January 17, 2022 1:05 PM
To: Tamir Sagi 
Cc: user@flink.apache.org 
Subject: Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored and 
falls back to default /opt/flink/conf/log4j-console.properties


EXTERNAL EMAIL


I think the root cause is that we are using "flink-console.sh" to start the 
JobManager/TaskManager process for native K8s integration after FLINK-21128[1].
So it forces the log4j configuration name to be "log4j-console.properties".


[1]. https://issues.apache.org/jira/browse/FLINK-21128


Best,
Yang

Tamir Sagi mailto:tamir.s...@niceactimize.com>> 
于2022年1月13日周四 20:30写道:
Hey All

I'm Running Flink 1.14.2, it seems like it ignores system property 
-Dlog4j.configurationFile and
falls back to /opt/flink/conf/log4j-console.properties

I enabled debug log for log4j2  ( -Dlog4j2.debug)

DEBUG StatusLogger Catching
 java.io.FileNotFoundException: file:/opt/flink/conf/log4j-console.properties 
(No such file or directory)
at java.base/java.io.FileInputStream.open0(Native Method)
at java.base/java.io.FileInputStream.open(Unknown Source)
at java.base/java.io.FileInputStream.(Unknown Source)
at 
org.apache.logging.log4j.core.config.ConfigurationFactory.getInputFromString(ConfigurationFactory.java:370)
at 
org.apache.logging.log4j.core.config.ConfigurationFactory$Factory.getConfiguration(Configurat

Re: Examples / Documentation for Flink ML 2

2022-01-17 Thread Dawid Wysakowicz
I am adding a couple of people who worked on it. Hopefully, they will be
able to answer you.

On 17/01/2022 13:39, Bonino Dario wrote:
>
> Dear List,
>
> We are in the process of evaluating Flink ML version 2.0 in the
> context of some ML task mainly concerned with classification and
> clustering.
>
> While algorithms for this 2 domains are already present, although in a
> limited form (perhaps) in the latest release of Flink ML, we did not
> found any example / documentation that could guide our experiments.
>
> Is some adoption example available, like code, tutorial or any
> information that might help us in bootstrapping a Flink ML 2 project?
>
> Thank you very much
>
> Best regards
>
> -- 
> Ing. Dario Bonino, Ph.D
>
> e-m@il: dario.bon...@gmail.com 
> www: https://www.linkedin.com/in/dariobonino
> 
>   Dario
>   Bonino
>   slide...@hotmail.com
>  


OpenPGP_signature
Description: OpenPGP digital signature


Re: [statefun] upgrade path - shared cluster use

2022-01-17 Thread Dawid Wysakowicz
I am pretty confident the goal is to be able to run on the newest Flink
version. However, as the release cycle is decoupled for both modules it
might take a bit.

I added Igal to the conversation, who I hope will be able to give you an
idea when you can expect that to happen.

Best,

Dawid

On 17/01/2022 11:48, Filip Karnicki wrote:
> Hi, we're currently using statefun 3.1.1 on a shared cloudera cluster,
> which is going to be updated to 1.14.x 
>
> We think this update might break our jobs, since 3.1.1 is not
> explicitly compatible with 1.14.x
> (https://flink.apache.org/downloads.html#apache-flink-stateful-functions-311
> )
>
> Is there any appetite for statefun to always be made compatible with
> the latest base flink version, or do we need to stop using the shared
> cluster and procure our own? Or is the update of statefun to something
> like 3.2.0 (based on 1.14.x) just a matter of having the resources to
> do it?
>
> Thanks
> Fil



OpenPGP_signature
Description: OpenPGP digital signature


Examples / Documentation for Flink ML 2

2022-01-17 Thread Bonino Dario

Dear List,

We are in the process of evaluating Flink ML version 2.0 in the context 
of some ML task mainly concerned with classification and clustering.


While algorithms for this 2 domains are already present, although in a 
limited form (perhaps) in the latest release of Flink ML, we did not 
found any example / documentation that could guide our experiments.


Is some adoption example available, like code, tutorial or any 
information that might help us in bootstrapping a Flink ML 2 project?


Thank you very much

Best regards

--
Ing. Dario Bonino, Ph.D

e-m@il:dario.bon...@gmail.com  
www:https://www.linkedin.com/in/dariobonino


Dario
Bonino
slide...@hotmail.com



Re: Flink per-job cluster HbaseSinkFunction fails before starting - Configuration issue

2022-01-17 Thread Dawid Wysakowicz
Hey Kamil,

Have you followed this guide to setup kerberos authentication[1]?

Best,

Dawid

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/security/security-kerberos/

On 14/01/2022 17:09, Kamil ty wrote:
> Hello all,
> I have a flink job that is using the HbaseSinkFunction as specified
> here: flink/flink-connectors/flink-connector-hbase-2.2 at master ·
> a0x8o/flink (github.com)
> 
>
> I'm deploying the job to a cluster in yarn per-job mode. Using flink
> run -d job.jar.
>
> The job gets accepted and I get the address of the UI but when looking
> at the UI the job stays at CREATED and never actually runs. After some
> time it stops. 
>
> This error stands out when looking at the logs:
> WARN [main] org.apache.hadoop.security.LdapGroupsMapping: Exception
> while trying to get password for alias
> hadoop.security.group.mapping.ldap.bind.password:
> java.io.IOException: Configuration problem with provider path.
>         at
> org.apache.hadoop.conf.Configuration.getPasswordFromCredentialProviders(Configuration.java:2428)
>         at
> org.apache.hadoop.conf.Configuration.getPassword(Configuration.java:2347)
>         at
> org.apache.hadoop.security.LdapGroupsMapping.getPassword(LdapGroupsMapping.java:797)
>         at
> org.apache.hadoop.security.LdapGroupsMapping.setConf(LdapGroupsMapping.java:680)
>         at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:77)
>         at
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:137)
>         at org.apache.hadoop.security.Groups.(Groups.java:105)
>         at org.apache.hadoop.security.Groups.(Groups.java:101)
>         at
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:476)
>         at
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:352)
>         at
> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:314)
>         at
> org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1996)
>         at
> org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:743)
>         at
> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:693)
>         at
> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:604)
>         at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer.main(ContainerLocalizer.java:468)
> Caused by: java.nio.file.AccessDeniedException:
> /var/run/.../process/1546359139-yarn-NODEMANAGER/creds.localjceks
>         at
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:84)
>         at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>         at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>         at
> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
>         at java.nio.file.Files.newByteChannel(Files.java:361)
>         at java.nio.file.Files.newByteChannel(Files.java:407)
>         at
> java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
>         at java.nio.file.Files.newInputStream(Files.java:152)
>         at
> org.apache.hadoop.security.alias.LocalKeyStoreProvider.getInputStreamForFile(LocalKeyStoreProvider.java:76)
>         at
> org.apache.hadoop.security.alias.AbstractJavaKeyStoreProvider.locateKeystore(AbstractJavaKeyStoreProvider.java:325)
>         at
> org.apache.hadoop.security.alias.AbstractJavaKeyStoreProvider.(AbstractJavaKeyStoreProvider.java:86)
>         at
> org.apache.hadoop.security.alias.LocalKeyStoreProvider.(LocalKeyStoreProvider.java:56)
>         at
> org.apache.hadoop.security.alias.LocalJavaKeyStoreProvider.(LocalJavaKeyStoreProvider.java:42)
>         at
> org.apache.hadoop.security.alias.LocalJavaKeyStoreProvider.(LocalJavaKeyStoreProvider.java:34)
>         at
> org.apache.hadoop.security.alias.LocalJavaKeyStoreProvider$Factory.createProvider(LocalJavaKeyStoreProvider.java:68)
>         at
> org.apache.hadoop.security.alias.CredentialProviderFactory.getProviders(CredentialProviderFactory.java:73)
>         at
> org.apache.hadoop.conf.Configuration.getPasswordFromCredentialProviders(Configuration.java:2409)
>         ... 15 more
>
> This seems as if it tries to access by password based authentication
> but on the cluster only Kerberos based authentication should be used.
>
> The log output when scheduling the job might be a clue:
> org.apache.flink.yarn.Utils         [] - Attempting to obtain Kerberos
> security token for HBase
> org.apache.flink.yarn.Utils         [] - Hbase is not available (not
> packaged with this application): ClassNotFoundException :
> "org.apache.hadoop.hbase.HbaseConfiguration

Re: Flink native k8s integration vs. operator

2022-01-17 Thread Gyula Fóra
Hi Yang!

Thanks for the input!

I agree with you on both points that you made. Even if we might support
both standalone and native modes in the long run, we should probably build
the first version on top of the native integration.
This I feel will result in a much simpler, minimalistic first version that
will already support the most important features. We are familiar with your
PoC implementation and I think it's a great idea to use that as a base.

As for Java / Go, I think Java is the obvious choice here. I would have to
think very hard to make any good arguments for picking Go :)

Cheers,
Gyula



On Mon, Jan 17, 2022 at 10:30 AM Yang Wang  wrote:

>  Glad to see that the interest of this thread keeps going. And thanks
> Thomas, Gyula, and Marton for driving this effort.
>
> I want to share my two cents about the Flink K8s operator.
>
> > Standalone deployment VS native K8s integration
>
> There is already some feature requirement issue[1] for the existing
> GoogleCloudPlatform/flink-on-k8s-operator to support native K8s
> integration. So I think
> it will be great if the new introduced K8s operator could support native
> K8s mode. I could imagine some advantages for using native mode. e.g.
> dynamic allocation,
> stability improvement, etc.
>
> Compared with standalone + reactive mode, the native K8s could not
> integrate with auto-scaling(allocate/remove TaskManager pods based on
> metrics) well.
> Since the reconcile behavior for standalone and native K8s mode will be
> different, I am not sure whether we will support them both at the very
> beginning.
>
>
> > Go VS Java
>
> Although most of the K8s operators are developed in Go, which could benefit
> from the prosperous ecosystem and various tools. I lean to develop the K8s
> operator under Flink umbrella using Java.
> Then the Flink contributors will be easier to get involved. We could use
> the same Kubernetes Java client with Flink. When Flink exposes some public
> deployment interfaces(e.g. ApplicationDeployer)
> in the future, the K8s operator will also benefit a lot from this.
>
> I already have a simple PoC project of this implementation[2]. Hope you
> could get some inspirations from this.
>
>
> [1].
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/168
> [2]. https://github.com/wangyang0918/flink-native-k8s-operator
>
>
> Best,
> Yang
>
>
>
> Xintong Song  于2022年1月14日周五 15:47写道:
>
> > Thanks for volunteering to drive this effort, Marton, Thomas and Gyula.
> >
> > Looking forward to the public discussion. Please feel free to reach out
> if
> > there's anything you need from us.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Fri, Jan 14, 2022 at 8:27 AM Chenya Zhang <
> chenyazhangche...@gmail.com>
> > wrote:
> >
> >> Thanks Thomas, Gyula, and Marton for driving this effort! It would
> >> greatly ease the adoption of Apache Flink on Kubernetes and help to
> address
> >> the current operational pain points as mentioned. Look forward to the
> >> proposal and more discussions!
> >>
> >> Best,
> >> Chenya
> >>
> >> On Thu, Jan 13, 2022 at 12:15 PM Márton Balassi <
> balassi.mar...@gmail.com>
> >> wrote:
> >>
> >>> Hi All,
> >>>
> >>> I am pleased to see the level of enthusiasm and technical consideration
> >>> already emerging in this thread. I wholeheartedly support building an
> >>> operator and endorsing it via placing it under the Apache Flink
> umbrella
> >>> (as a separate repository) as the current lack of it is clearly
> becoming
> >>> an
> >>> adoption bottleneck for large scale Flink users. The next logical step
> is
> >>> to write a FLIP to agree on the technical details, so that we can put
> >>> forward the proposal to the Flink PMC for creating a new repository
> with
> >>> a
> >>> clear purpose in mind. I volunteer to work with Thomas and Gyula on the
> >>> initial wording on the proposal which we will put up for public
> >>> discussion
> >>> in the coming weeks.
> >>>
> >>> Best,
> >>> Marton
> >>>
> >>> On Thu, Jan 13, 2022 at 9:22 AM Konstantin Knauf 
> >>> wrote:
> >>>
> >>> > Hi Thomas,
> >>> >
> >>> > Yes, I was referring to a separate repository under Apache Flink.
> >>> >
> >>> > Cheers,
> >>> >
> >>> > Konstantin
> >>> >
> >>> > On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise  wrote:
> >>> >
> >>> >> Hi everyone,
> >>> >>
> >>> >> Thanks for the feedback and discussion. A few additional thoughts:
> >>> >>
> >>> >> [Konstantin] > With respect to common lifecycle management
> operations:
> >>> >> these features are
> >>> >> > not available (within Apache Flink) for any of the other resource
> >>> >> providers
> >>> >> > (YARN, Standalone) either. From this perspective, I wouldn't
> >>> consider
> >>> >> this
> >>> >> > a shortcoming of the Kubernetes integration.
> >>> >>
> >>> >> I think time and evolution of the ecosystem are factors to consider
> as
> >>> >> well. The state and usage of Flink was much different when YARN
> >>> >> integration was novel. Expectations are different today and the

Re: [E] Re: Orphaned job files in HDFS

2022-01-17 Thread Yang Wang
The clean-up of the staging directory is best effort. If the JobManager
crashed and killed externally, then it does not have any chance to do the
staging directory clean-up.
AFAIK, we do not have such Flink options to guarantee the clean-up.


Best,
Yang

David Clutter  于2022年1月11日周二 22:59写道:

> Ok, that makes sense.  I did see some job failures.  However failures
> could happen occasionally.  Is there any option to have the job manager
> clean-up these directories when the job has failed?
>
> On Mon, Jan 10, 2022 at 8:58 PM Yang Wang  wrote:
>
>> IIRC, the staging directory(/user/{name}/.flink/application_xxx) will be
>> deleted automatically if the Flink job reaches global terminal state(e.g.
>> FINISHED, CANCELED, FAILED).
>> So I assume you have stopped the yarn application via "yarn application
>> -kill", not via "bin/flink cancel".
>> If it is the case, then having the residual staging directory is an
>> expected behavior since Flink JobManager does not have a chance to do the
>> clean-up.
>>
>>
>>
>> Best,
>> Yang
>>
>> David Clutter  于2022年1月11日周二 10:08写道:
>>
>>> I'm seeing files orphaned in HDFS and wondering how to clean them up
>>> when the job is completed.  The directory is /user/yarn/.flink so I am
>>> assuming this is created by flink?  The HDFS in my cluster eventually fills
>>> up.
>>>
>>> Here is my setup:
>>>
>>>- Flink 1.13.1 on AWS EMR
>>>- Executing flink in per-job mode
>>>- Job is submitted every 5m
>>>
>>> In HDFS under /user/yarn/.flink I see a directory created for every
>>> flink job submitted/yarn application.  Each application directory contains
>>> my user jar file, flink-dist jar, /lib with various flink jars,
>>> log4j.properties.
>>>
>>> Is there a property to tell flink to clean up this directory when the
>>> job is completed?
>>>
>>


Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored and falls back to default /opt/flink/conf/log4j-console.properties

2022-01-17 Thread Yang Wang
I think the root cause is that we are using "flink-console.sh" to start the
JobManager/TaskManager process for native K8s integration after
FLINK-21128[1].
So it forces the log4j configuration name to be "log4j-console.properties".


[1]. https://issues.apache.org/jira/browse/FLINK-21128


Best,
Yang

Tamir Sagi  于2022年1月13日周四 20:30写道:

> Hey All
>
> I'm Running Flink 1.14.2, it seems like it ignores system
> property -Dlog4j.configurationFile and
> falls back to /opt/flink/conf/log4j-console.properties
>
> I enabled debug log for log4j2  ( -Dlog4j2.debug)
>
> DEBUG StatusLogger Catching
>  java.io.FileNotFoundException:
> file:/opt/flink/conf/log4j-console.properties (No such file or directory)
> at java.base/java.io.FileInputStream.open0(Native Method)
> at java.base/java.io.FileInputStream.open(Unknown Source)
> at java.base/java.io.FileInputStream.(Unknown Source)
> at
> org.apache.logging.log4j.core.config.ConfigurationFactory.getInputFromString(ConfigurationFactory.java:370)
> at
> org.apache.logging.log4j.core.config.ConfigurationFactory$Factory.getConfiguration(ConfigurationFactory.java:513)
> at
> org.apache.logging.log4j.core.config.ConfigurationFactory$Factory.getConfiguration(ConfigurationFactory.java:499)
> at
> org.apache.logging.log4j.core.config.ConfigurationFactory$Factory.getConfiguration(ConfigurationFactory.java:422)
> at
> org.apache.logging.log4j.core.config.ConfigurationFactory.getConfiguration(ConfigurationFactory.java:322)
> at
> org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:695)
> at
> org.apache.logging.log4j.core.LoggerContext.reconfigure(LoggerContext.java:716)
> at
> org.apache.logging.log4j.core.LoggerContext.start(LoggerContext.java:270)
> at
> org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:155)
> at
> org.apache.logging.log4j.core.impl.Log4jContextFactory.getContext(Log4jContextFactory.java:47)
> at org.apache.logging.log4j.LogManager.getContext(LogManager.java:196)
> at
> org.apache.logging.log4j.spi.AbstractLoggerAdapter.getContext(AbstractLoggerAdapter.java:137)
> at
> org.apache.logging.slf4j.Log4jLoggerFactory.getContext(Log4jLoggerFactory.java:55)
> at
> org.apache.logging.log4j.spi.AbstractLoggerAdapter.getLogger(AbstractLoggerAdapter.java:47)
> at
> org.apache.logging.slf4j.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:33)
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:329)
> at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:349)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.(AkkaRpcServiceUtils.java:55)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcSystem.remoteServiceBuilder(AkkaRpcSystem.java:42)
> at
> org.apache.flink.runtime.rpc.akka.CleanupOnCloseRpcSystem.remoteServiceBuilder(CleanupOnCloseRpcSystem.java:77)
> at
> org.apache.flink.runtime.rpc.RpcUtils.createRemoteRpcService(RpcUtils.java:184)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:300)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:243)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:193)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:617)
>
> Where I see the property is being loaded while deploying the cluster
>
> source:{
> class:org.apache.flink.configuration.GlobalConfiguration
> method:loadYAMLResource
> file:GlobalConfiguration.java
> line:213
> }
> message:Loading configuration property: env.java.opts,
> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/dumps
> -Dlog4j.configurationFile=/opt/log4j2/log4j2.xml -Dlog4j2.debug=true
>
> in addition,  following the documentation[1], it seems like Flink comes
> with default log4j properties files located in /opt/flink/conf
>
> looking into that dir once the cluster is deployed, only flink-conf.yaml
> is there.
>
>
>
> Docker file content
>
> FROM flink:1.14.2-scala_2.12-java11
> ARG JAR_FILE
> COPY target/${JAR_FILE} $FLINK_HOME/usrlib/flink-job.jar
> ADD log4j2.xml /opt/log4j2/log4j2.xml
>
>
>
> *It perfectly works in 1.12.2 with the same log4j2.xml file and system
> property. *
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/advanced/logging/#configuring-log4j-2
>
>
> Best,
> Tamir
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy 

[statefun] upgrade path - shared cluster use

2022-01-17 Thread Filip Karnicki
Hi, we're currently using statefun 3.1.1 on a shared cloudera cluster,
which is going to be updated to 1.14.x

We think this update might break our jobs, since 3.1.1 is not explicitly
compatible with 1.14.x (
https://flink.apache.org/downloads.html#apache-flink-stateful-functions-311)

Is there any appetite for statefun to always be made compatible with the
latest base flink version, or do we need to stop using the shared cluster
and procure our own? Or is the update of statefun to something like 3.2.0
(based on 1.14.x) just a matter of having the resources to do it?

Thanks
Fil


Disable S3 HTTPS host name verification

2022-01-17 Thread Tim Eckhardt
Hi,

 

we are trying to get the S3 integration to work for storing checkpoints and 
savepoints.

Our S3 endpoint is self-hosted so we’re not using AWS in particular.

 

The problem I’m facing is that I’m connecting via HTTPS (443) but I have to 
disable host name verification for now. But I can’t find any configuration 
settings to do so for the S3 connection.

I’ve tried the “security.ssl.verify-hostname: false” setting but it does not 
seem to have any effect on the S3 connection.

 

So far I’ve only used the s3.presto plugin but it seems that there isn’t a 
suitable setting for the hadoop plugin either.

Also I’m not really sure which plugin is best to be used with our use case, is 
any of those preferred over the other? What are the differences?

 

The S3 specific settings I’ve set are:

 

s3.endpoint: 

s3.access-key: 

s3.secret-key: 

s3.path.style.access: true

state.checkpoints.dir: S3://checkpoints-bucket

state.savepoints.dir: S3://savepoints-bucket

security.ssl.verify-hostname: false

 

 

Thanks, Tim



smime.p7s
Description: S/MIME cryptographic signature


Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage integration

2022-01-17 Thread wangqinghuan
we are using Datahub to address table-level lineage and column-level 
lineage for Flink SQL.


在 2022/1/13 23:27, Martijn Visser 写道:

Hi everyone,

I'm currently checking out different metadata platforms, such as 
Amundsen [1] and Datahub [2]. In short, these types of tools try to 
address problems related to topics such as data discovery, data 
lineage and an overall data catalogue.


I'm reaching out to the Dev and User mailing lists to get some 
feedback. It would really help if you could spend a couple of minutes 
to let me know if you already use either one of the two mentioned 
metadata platforms or another one, or are you evaluating such tools? 
If so, is that for the purpose as a catalogue, for lineage or anything 
else? Any type of feedback on these types of tools is appreciated.


Best regards,

Martijn

[1] https://github.com/amundsen-io/amundsen/
[2] https://github.com/linkedin/datahub


How to accelerate state processor with a large savepoint

2022-01-17 Thread Hua Wei Chen
Hi team,

We want to try to use state processor APIs[1] to clean up some legacy
states.

Here are our steps:
1. Create a new savepoint (~= 1.5TB)
2. Submit state processor jobs
3. Write results to a new savepoint

We create 8 task managers with 120 slots to execute it.
Here are the related configurations
```
kubernetes.pod-template-file.taskmanager: /srv/pod-template-taskmanager.yml
kubernetes.taskmanager.cpu: 15
taskmanager.memory.jvm-metaspace.size: 512m
taskmanager.memory.managed.fraction: 0.7
taskmanager.memory.process.size: 52g
taskmanager.memory.task.off-heap.size: 1g # For queryable state
taskmanager.numberOfTaskSlots: 15
taskmanager.rpc.port: 6122
```

And the state processor execution logics are below.
```
  def execute(savePointPath: String, newSavePointPath: String, config:
Config) = {
val env = ExecutionEnvironment.getExecutionEnvironment

val state = Savepoint
  .load(env, savePointPath, new EmbeddedRocksDBStateBackend)
  .readKeyedState(
"read-key-state-id",
new CustomKeyStateReadFunction(config),
createTypeInformation[CustomKey],
createTypeInformation[CustomKeyState],
  )

val transformation = OperatorTransformation
  .bootstrapWith(state)
  .keyBy((x: CustomKeyState) => x.key)
  .transform(new CustomKeyStateBootstrapFunction(config))

Savepoint
  .create(new EmbeddedRocksDBStateBackend(true), MAX_PARALLELISM)
  .withOperator("operator_id", transformation)
  .write(newSavePointPath)

env.execute()
  }
```

However, seems like the state process is very slow(13 Mb/min) and the
resource usage is very low. (CPU: 10-15% / Memory: 50-60%)

Are there any tips to speed up the process? Thanks!

-- 
*Regards,*
*Oscar*


Re: Flink native k8s integration vs. operator

2022-01-17 Thread Yang Wang
 Glad to see that the interest of this thread keeps going. And thanks
Thomas, Gyula, and Marton for driving this effort.

I want to share my two cents about the Flink K8s operator.

> Standalone deployment VS native K8s integration

There is already some feature requirement issue[1] for the existing
GoogleCloudPlatform/flink-on-k8s-operator to support native K8s
integration. So I think
it will be great if the new introduced K8s operator could support native
K8s mode. I could imagine some advantages for using native mode. e.g.
dynamic allocation,
stability improvement, etc.

Compared with standalone + reactive mode, the native K8s could not
integrate with auto-scaling(allocate/remove TaskManager pods based on
metrics) well.
Since the reconcile behavior for standalone and native K8s mode will be
different, I am not sure whether we will support them both at the very
beginning.


> Go VS Java

Although most of the K8s operators are developed in Go, which could benefit
from the prosperous ecosystem and various tools. I lean to develop the K8s
operator under Flink umbrella using Java.
Then the Flink contributors will be easier to get involved. We could use
the same Kubernetes Java client with Flink. When Flink exposes some public
deployment interfaces(e.g. ApplicationDeployer)
in the future, the K8s operator will also benefit a lot from this.

I already have a simple PoC project of this implementation[2]. Hope you
could get some inspirations from this.


[1]. https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/168
[2]. https://github.com/wangyang0918/flink-native-k8s-operator


Best,
Yang



Xintong Song  于2022年1月14日周五 15:47写道:

> Thanks for volunteering to drive this effort, Marton, Thomas and Gyula.
>
> Looking forward to the public discussion. Please feel free to reach out if
> there's anything you need from us.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jan 14, 2022 at 8:27 AM Chenya Zhang 
> wrote:
>
>> Thanks Thomas, Gyula, and Marton for driving this effort! It would
>> greatly ease the adoption of Apache Flink on Kubernetes and help to address
>> the current operational pain points as mentioned. Look forward to the
>> proposal and more discussions!
>>
>> Best,
>> Chenya
>>
>> On Thu, Jan 13, 2022 at 12:15 PM Márton Balassi 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am pleased to see the level of enthusiasm and technical consideration
>>> already emerging in this thread. I wholeheartedly support building an
>>> operator and endorsing it via placing it under the Apache Flink umbrella
>>> (as a separate repository) as the current lack of it is clearly becoming
>>> an
>>> adoption bottleneck for large scale Flink users. The next logical step is
>>> to write a FLIP to agree on the technical details, so that we can put
>>> forward the proposal to the Flink PMC for creating a new repository with
>>> a
>>> clear purpose in mind. I volunteer to work with Thomas and Gyula on the
>>> initial wording on the proposal which we will put up for public
>>> discussion
>>> in the coming weeks.
>>>
>>> Best,
>>> Marton
>>>
>>> On Thu, Jan 13, 2022 at 9:22 AM Konstantin Knauf 
>>> wrote:
>>>
>>> > Hi Thomas,
>>> >
>>> > Yes, I was referring to a separate repository under Apache Flink.
>>> >
>>> > Cheers,
>>> >
>>> > Konstantin
>>> >
>>> > On Thu, Jan 13, 2022 at 6:19 AM Thomas Weise  wrote:
>>> >
>>> >> Hi everyone,
>>> >>
>>> >> Thanks for the feedback and discussion. A few additional thoughts:
>>> >>
>>> >> [Konstantin] > With respect to common lifecycle management operations:
>>> >> these features are
>>> >> > not available (within Apache Flink) for any of the other resource
>>> >> providers
>>> >> > (YARN, Standalone) either. From this perspective, I wouldn't
>>> consider
>>> >> this
>>> >> > a shortcoming of the Kubernetes integration.
>>> >>
>>> >> I think time and evolution of the ecosystem are factors to consider as
>>> >> well. The state and usage of Flink was much different when YARN
>>> >> integration was novel. Expectations are different today and the
>>> >> lifecycle functionality provided by an operator may as well be
>>> >> considered essential to support the concept of a Flink application on
>>> >> k8s. After few years learning from operator experience outside of
>>> >> Flink it might be a good time to fill the gap.
>>> >>
>>> >> [Konstantin] > I still believe that we should keep this focus on low
>>> >> > level composable building blocks (like Jobs and Snapshots) in Apache
>>> >> Flink
>>> >> > to make it easy for everyone to build fitting higher level
>>> abstractions
>>> >> > like a FlinkApplication Custom Resource on top of it.
>>> >>
>>> >> I completely agree that it is important that the basic functions of
>>> >> Flink are solid and continued focus is necessary. Thanks for sharing
>>> >> the pointers, these are great improvements. At the same time,
>>> >> ecosystem, contributor base and user spectrum are growing. There have
>>> >> been significant additions in many areas of Flink including