How to tell between a local mode run vs. remote mode run?

2021-05-04 Thread Yik San Chan
Hi,

According to
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/faq/

> When executing jobs in mini cluster(e.g. when executing jobs in IDE) ...
please remember to explicitly wait for the job execution to finish as these
APIs are asynchronous.

I hope my program will be able to run in both local mode as well as in
remote mode. Therefore I hope to do something like:

```python
result = ...
if local_mode:
  result.wait()
else:
  result
```

Is there a way to tell if the program is run under local mode vs. remote
mode?

Best,
Yik San


Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-04 Thread Fuyao Li
Hello All,

I also checked the native-k8s’s automatically generated configmap. It only 
contains the flink-conf.yaml, but no log4j.properties. I am not very familiar 
with the implementation details behind native k8s.

That should be the root cause, could you check the implementation and help me 
to locate the potential problem.
Yang’s initial code: 
https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java
My modified version: 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java

Thank you so much.

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 19:34
To: Austin Cawley-Edwards , matth...@ververica.com 
, Yang Wang 
Cc: user 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello Austin, Yang,

For the logging issue, I think I have found something worth to notice.

They are all based on base image flink:1.12.1-scala_2.11-java11

Dockerfile: 
https://pastebin.ubuntu.com/p/JTsHygsTP6/

In the JM and TM provisioned by the k8s operator. There is only flink-conf.yaml 
in the $FLINK_HOME/conf directory. Even if I tried to add these configurations 
to the image in advance. It seems the operator is seems overriding it and 
removing all other log4j configurations. This is causing the logs can’t be 
printed correctly.
root@flink-demo-5fc78c8cf-hgvcj:/opt/flink/conf# ls
flink-conf.yaml


However, for the pods that is provisioned by flink native k8s CLI. There exists 
some log4j related configurations.

root@test-application-cluster-79c7f9dcf7-44bq8:/opt/flink/conf# ls
flink-conf.yaml  log4j-console.properties  logback-console.xml


The native Kubernetes operator pod can print logs correctly because it has the 
log4j.properties file mounted to the opt/flink/conf/ directory. [1]
For the Flink pods, it seems that it only have a flink-conf.yaml injected 
there. [2][3] No log4j related configmap is configured. That makes the logs in 
those pods no available.

I am not sure how to inject something similar to the flink pods? Maybe adding 
some similar structure that exists in [1], into the cr.yaml ? So that such 
configmap will make the log4j.properties available for flink CRD?

I am kind of confused at how to implement this. The deployment is a one-step 
operation in [4]. I don’t know how to make a configmap available to it? Maybe I 
can only use the new feature – pod template in Flink 1.13 to do this?



[1] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml#L58
[2] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/cr.yaml#L21
[3] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java#L83
[4] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java#L176

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 15:23
To: Austin Cawley-Edwards , matth...@ververica.com 

Cc: user , Yang Wang , Austin 
Cawley-Edwards 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello All,

For Logging issue:
Hi Austin,

This is the my full code implementation link [1], I just removed the credential 
related things. The operator definition can be found here.[2] You can also 
check other parts if you find any problem.
The operator uses log4j2 and you can see there is a log appender for operator 
[3] and the operator pod can indeed print out logs. But for the flink 

Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-04 Thread Fuyao Li
Hello Austin, Yang,

For the logging issue, I think I have found something worth to notice.

They are all based on base image flink:1.12.1-scala_2.11-java11

Dockerfile: https://pastebin.ubuntu.com/p/JTsHygsTP6/

In the JM and TM provisioned by the k8s operator. There is only flink-conf.yaml 
in the $FLINK_HOME/conf directory. Even if I tried to add these configurations 
to the image in advance. It seems the operator is seems overriding it and 
removing all other log4j configurations. This is causing the logs can’t be 
printed correctly.
root@flink-demo-5fc78c8cf-hgvcj:/opt/flink/conf# ls
flink-conf.yaml


However, for the pods that is provisioned by flink native k8s CLI. There exists 
some log4j related configurations.

root@test-application-cluster-79c7f9dcf7-44bq8:/opt/flink/conf# ls
flink-conf.yaml  log4j-console.properties  logback-console.xml


The native Kubernetes operator pod can print logs correctly because it has the 
log4j.properties file mounted to the opt/flink/conf/ directory. [1]
For the Flink pods, it seems that it only have a flink-conf.yaml injected 
there. [2][3] No log4j related configmap is configured. That makes the logs in 
those pods no available.

I am not sure how to inject something similar to the flink pods? Maybe adding 
some similar structure that exists in [1], into the cr.yaml ? So that such 
configmap will make the log4j.properties available for flink CRD?

I am kind of confused at how to implement this. The deployment is a one-step 
operation in [4]. I don’t know how to make a configmap available to it? Maybe I 
can only use the new feature – pod template in Flink 1.13 to do this?



[1] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml#L58
[2] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/cr.yaml#L21
[3] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/Utils/FlinkUtils.java#L83
[4] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java#L176

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, May 4, 2021 at 15:23
To: Austin Cawley-Edwards , matth...@ververica.com 

Cc: user , Yang Wang , Austin 
Cawley-Edwards 
Subject: Re: [External] : Re: StopWithSavepoint() method doesn't work in Java 
based flink native k8s operator
Hello All,

For Logging issue:
Hi Austin,

This is the my full code implementation link [1], I just removed the credential 
related things. The operator definition can be found here.[2] You can also 
check other parts if you find any problem.
The operator uses log4j2 and you can see there is a log appender for operator 
[3] and the operator pod can indeed print out logs. But for the flink 
application JM and TM pod, I can see the errors mentioned earlier. Sed error 
and ERROR StatusLogger No Log4j 2 configuration file found.

I used to use log4j for flink application, to avoid potential incompatible 
issue, I have already upgraded the POM for flink application to use log4j2. But 
the logging problem still exists.

This is my log4j2.properties file in flink application. [6] This is the loggin 
related pom dependencies for flink application [7].

The logs can be printed during normal native k8s deployment and IDE debugging. 
When it comes to the operator, it seems not working. Could this be caused by 
class namespace conflict? Since I introduced the presto jar in the flink 
distribution. This is my Dockerfile to build the flink application jar [5].

Please share your idea on this.

For the stopWithSavepoint issue,
Just to note, I understand cancel command (cancelWithSavepoint() ) is a 
deprecated feature and it may not guarantee exactly once semantic and get 
inconsistent result, like Timer related things? Please correct me if I am 
wrong. The code that works with cancelWithSavepoint() is shared in [4] below.


[1] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator
[2] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml
[3] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/resources/log4j2.properties
[4] 
https://pastebin.ubuntu.com/p/tcxT2FwPRS/

Re: Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread vishalovercome
In one of my jobs, windowing is the costliest operation while upstream and
downstream operators are not as resource intensive. There's another operator
in this job that communicates with internal services. This has high
parallelism as well but not as much as that of the windowing operation.
Running all operators with the same parallelism as the windowing operation
would choke some of our internal services we'll be consuming from our source
at a rate much higher than what our internal services can handle. Thus our
sources, sinks, validation, monitoring related operators have very low
parallelism while one has high parallelism and another has even higher
parallelism. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [External] : Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-04 Thread Fuyao Li
Hello All,

For Logging issue:
Hi Austin,

This is the my full code implementation link [1], I just removed the credential 
related things. The operator definition can be found here.[2] You can also 
check other parts if you find any problem.
The operator uses log4j2 and you can see there is a log appender for operator 
[3] and the operator pod can indeed print out logs. But for the flink 
application JM and TM pod, I can see the errors mentioned earlier. Sed error 
and ERROR StatusLogger No Log4j 2 configuration file found.

I used to use log4j for flink application, to avoid potential incompatible 
issue, I have already upgraded the POM for flink application to use log4j2. But 
the logging problem still exists.

This is my log4j2.properties file in flink application. [6] This is the loggin 
related pom dependencies for flink application [7].

The logs can be printed during normal native k8s deployment and IDE debugging. 
When it comes to the operator, it seems not working. Could this be caused by 
class namespace conflict? Since I introduced the presto jar in the flink 
distribution. This is my Dockerfile to build the flink application jar [5].

Please share your idea on this.

For the stopWithSavepoint issue,
Just to note, I understand cancel command (cancelWithSavepoint() ) is a 
deprecated feature and it may not guarantee exactly once semantic and get 
inconsistent result, like Timer related things? Please correct me if I am 
wrong. The code that works with cancelWithSavepoint() is shared in [4] below.


[1] https://github.com/FuyaoLi2017/flink-native-kubernetes-operator
[2] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/deploy/flink-native-k8s-operator.yaml
[3] 
https://github.com/FuyaoLi2017/flink-native-kubernetes-operator/blob/master/src/main/resources/log4j2.properties
[4] https://pastebin.ubuntu.com/p/tcxT2FwPRS/
[5] https://pastebin.ubuntu.com/p/JTsHygsTP6/
[6] https://pastebin.ubuntu.com/p/2wgdcxVfSy/
[7] https://pastebin.ubuntu.com/p/Sq8xRjQyVY/


Best,
Fuyao

From: Austin Cawley-Edwards 
Date: Tuesday, May 4, 2021 at 14:47
To: matth...@ververica.com 
Cc: Fuyao Li , user , Yang Wang 
, Austin Cawley-Edwards 
Subject: [External] : Re: StopWithSavepoint() method doesn't work in Java based 
flink native k8s operator
Hey all,

Thanks for the ping, Matthias. I'm not super familiar with the details of @Yang 
Wang's operator, to be honest :(. Can you share 
some of your FlinkApplication specs?

For the `kubectl logs` command, I believe that just reads stdout from the 
container. Which logging framework are you using in your application and how 
have you configured it? There's a good guide for configuring the popular ones 
in the Flink docs[1]. For instance, if you're using the default Log4j 2 
framework you should configure a ConsoleAppender[2].

Hope that helps a bit,
Austin

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/advanced/logging/
[2]: 
https://logging.apache.org/log4j/2.x/manual/appenders.html#ConsoleAppender

On Tue, May 4, 2021 at 1:59 AM Matthias Pohl 
mailto:matth...@ververica.com>> wrote:
Hi Fuyao,
sorry for not replying earlier. The stop-with-savepoint operation shouldn't 
only suspend but terminate the job. Is it that you might have a larger state 
that makes creating the savepoint take longer? Even though, considering that 
you don't experience this behavior with your 2nd solution, I'd assume that we 
could ignore this possibility.

I'm gonna add Austin to the conversation as he worked with k8s operators as 
well already. Maybe, he can also give you more insights on the logging issue 
which would enable us to dig deeper into what's going on with 
stop-with-savepoint.

Best,
Matthias

On Tue, May 4, 2021 at 4:33 AM Fuyao Li 
mailto:fuyao...@oracle.com>> wrote:
Hello,

Update:
I think stopWithSavepoint() only suspend the job. It doesn’t actually terminate 
(./bin/flink cancel) the job. I switched to cancelWithSavepoint() and it works 
here.

Maybe stopWithSavepoint() should only be used to update the configurations like 
parallelism? For updating the image, this seems to be not suitable, please 
correct me if I am wrong.

For the log issue, I am still a bit confused. Why it is not available in 
kubectl logs. How should I get access to it?

Thanks.
Best,
Fuyao

From: Fuyao Li mailto:fuyao...@oracle.com>>
Date: Sunday, May 2, 2021 at 00:36
To: user mailto:user@flink.apache.org>>, Yang Wang 
mailto:danrtsey...@gmail.com>>
Subject: [External] : Re: StopWithSavepoint() method doesn't work in Java based 
flink

Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-04 Thread Austin Cawley-Edwards
Hey all,

Thanks for the ping, Matthias. I'm not super familiar with the details of @Yang
Wang 's operator, to be honest :(. Can you share
some of your FlinkApplication specs?

For the `kubectl logs` command, I believe that just reads stdout from the
container. Which logging framework are you using in your application and
how have you configured it? There's a good guide for configuring the
popular ones in the Flink docs[1]. For instance, if you're using the
default Log4j 2 framework you should configure a ConsoleAppender[2].

Hope that helps a bit,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/advanced/logging/
[2]:
https://logging.apache.org/log4j/2.x/manual/appenders.html#ConsoleAppender

On Tue, May 4, 2021 at 1:59 AM Matthias Pohl  wrote:

> Hi Fuyao,
> sorry for not replying earlier. The stop-with-savepoint operation
> shouldn't only suspend but terminate the job. Is it that you might have a
> larger state that makes creating the savepoint take longer? Even though,
> considering that you don't experience this behavior with your 2nd solution,
> I'd assume that we could ignore this possibility.
>
> I'm gonna add Austin to the conversation as he worked with k8s operators
> as well already. Maybe, he can also give you more insights on the logging
> issue which would enable us to dig deeper into what's going on with
> stop-with-savepoint.
>
> Best,
> Matthias
>
> On Tue, May 4, 2021 at 4:33 AM Fuyao Li  wrote:
>
>> Hello,
>>
>>
>>
>> Update:
>>
>> I think stopWithSavepoint() only suspend the job. It doesn’t actually
>> terminate (./bin/flink cancel) the job. I switched to cancelWithSavepoint()
>> and it works here.
>>
>>
>>
>> Maybe stopWithSavepoint() should only be used to update the
>> configurations like parallelism? For updating the image, this seems to be
>> not suitable, please correct me if I am wrong.
>>
>>
>>
>> For the log issue, I am still a bit confused. Why it is not available in
>> kubectl logs. How should I get access to it?
>>
>>
>>
>> Thanks.
>>
>> Best,
>>
>> Fuyao
>>
>>
>>
>> *From: *Fuyao Li 
>> *Date: *Sunday, May 2, 2021 at 00:36
>> *To: *user , Yang Wang 
>> *Subject: *[External] : Re: StopWithSavepoint() method doesn't work in
>> Java based flink native k8s operator
>>
>> Hello,
>>
>>
>>
>> I noticed that first trigger a savepoint and then delete the deployment
>> might cause the duplicate data issue. That could pose a bad influence to
>> the semantic correctness. Please give me some hints on how to make the
>> stopWithSavepoint() work correctly with Fabric8io Java k8s client to
>> perform this image update operation. Thanks!
>>
>>
>>
>> Best,
>>
>> Fuyao
>>
>>
>>
>>
>>
>>
>>
>> *From: *Fuyao Li 
>> *Date: *Friday, April 30, 2021 at 18:03
>> *To: *user , Yang Wang 
>> *Subject: *[External] : Re: StopWithSavepoint() method doesn't work in
>> Java based flink native k8s operator
>>
>> Hello Community, Yang,
>>
>>
>>
>> I have one more question for logging. I also noticed that if I execute
>> kubectl logs  command to the JM. The pods provisioned by the operator can’t
>> print out the internal Flink logs in the kubectl logs. I can only get
>> something like the logs below. No actual flink logs is printed here… Where
>> can I find the path to the logs? Maybe use a sidecar container to get it
>> out? How can I get the logs without checking the Flink WebUI? Also, the sed
>> error makes me confused here. In fact, the application is already up and
>> running correctly if I access the WebUI through Ingress.
>>
>>
>>
>> Reference:
>> https://github.com/wangyang0918/flink-native-k8s-operator/issues/4
>> 
>>
>>
>>
>>
>>
>> [root@bastion deploy]# kubectl logs -f flink-demo-594946fd7b-822xk
>>
>>
>>
>> sed: couldn't open temporary file /opt/flink/conf/sedh1M3oO: Read-only
>> file system
>>
>> sed: couldn't open temporary file /opt/flink/conf/sed8TqlNR: Read-only
>> file system
>>
>> /docker-entrypoint.sh: line 75: /opt/flink/conf/flink-conf.yaml:
>> Read-only file system
>>
>> sed: couldn't open temporary file /opt/flink/conf/sedvO2DFU: Read-only
>> file system
>>
>> /docker-entrypoint.sh: line 88: /opt/flink/conf/flink-conf.yaml:
>> Read-only file system
>>
>> /docker-entrypoint.sh: line 90: /opt/flink/conf/flink-conf.yaml.tmp:
>> Read-only file system
>>
>> Start command: $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH
>> -Xmx3462817376 -Xms3462817376 -XX:MaxMetaspaceSize=268435456
>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
>> -D jobmanager.memory.off-heap.size=134217728b -D
>> jobmanager.memory.jvm-overhead.min=429496736b -D
>> jobmanager.memory.jvm-metaspace.size=268435456b -D
>> jobmanager.memory.heap.size=3462817376b -D
>> jobmanager.memory.jvm-overhead.max=429496736b
>>
>> ERROR StatusLogger No Log4j 2 configuration file found. Using d

Re: remote task manager netty exception

2021-05-04 Thread Yichen Liu
Chime in here since I work with Sihan.

Roman, there isn't much logs beyond this WARN, in fact it should be ERROR
since it fail our job and job has to restart.

Here is a fresh new example of "Sending the partition request to 'null'
failed." exception. The only log we see before exception was:

timestamp="2021-05-04 14:04:33,014", level="INFO", thread="Latest Billing
Info Operator -> (Filter, Filter) (55/80)#12", class="
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend", method="
cleanInstanceBasePath(line:462)", message="Closed RocksDB State Backend.
Cleaning up RocksDB working directory
/tmp/flink-io-9570aace-eec0-4dd9-867f-22a7d367282e/job__op_KeyedProcessOperator_6cf741936dcd5ce8199875ace1f5638a__55_80__uuid_b8ef0675-4355-4b99-9f03-77d1eb713bf4."

timestamp="2021-05-04 14:04:33,633", level="WARN", thread="Latest Billing
Info Operator -> (Filter, Filter) (55/80)#12",
class="org.apache.flink.runtime.taskmanager.Task",
method="transitionState(line:1033)", message="Latest Billing Info Operator
-> (Filter, Filter) (55/80)#12 (0f9caefb1122609e3337f2537f7324c3) switched
from RUNNING to FAILED."
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
Sending the partition request to 'null' failed. at
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
~[flink-dist_2.12-1.12.2.jar:1.12.2]

but this looks more like a consequence than cause of exception.

Note this seems to be pretty consistent when one of our TMs went lost.
Could it be somehow partition info isn't up to date on TM when job is
restarting?

Also note that we have a pretty huge state, each TM has around 130GB state,
TMs have a setting of 10GB memory and 2700m CPU (in k8s unit).

On Mon, May 3, 2021 at 8:29 AM Roman Khachatryan  wrote:

> Hi,
>
> I see that JM and TM failures are different (from TM, it's actually a
> warning). Could you please share the ERROR message from TM?
>
> Have you tried increasing taskmanager.network.retries [1]?
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#taskmanager-network-retries
>
> Regards,
> Roman
>
> On Fri, Apr 30, 2021 at 11:55 PM Sihan You  wrote:
> >
> > Hi,
> >
> > We are experiencing some netty issue with our Flink cluster, which we
> couldn't figure the cause.
> >
> > Below is the stack trace of exceptions from TM's and JM's perspectives.
> we have 85 TMs and one JM in HA mode. The strange thing is that only 23 of
> the TM are complaining about the connection issue. When this exception
> occurs, the TM they are complaining about is still up and live. this will
> cause our job to be stuck in the restart loop for a couple of hours then
> back to normal.
> >
> > We are using HDFS as the state backend and the checkpoint dir.
> > the application is running in our own data center and in Kubernetes as a
> standalone job.
> >
> >
> > ## Job Graph
> >
> > the job graph is like this.
> > source 1.1 (5 parallelism).  ->
> >   union ->
> > source 1.2 (80 parallelism) ->
> >
>  connect -> sink
> > source 2.1 (5 parallelism).  ->
> >   union ->
> > source 2.2 (80 parallelism) ->
> >
> >
> > ## JM's Stacktrace
> >
> > ```
> > message="PLI Deduplicate Operator (60/80)
> (5d2b9fba2eaeae452068bc53e4232d0c) switched from RUNNING to FAILED on
> 100.98.115.117:6122-924d20 @ 100.98.115.117
> (dataPort=41245)."org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> Sending the partition request to 'null' failed. at
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2] at
> org.apache.flink.shaded.netty4.io.netty.util.concu

Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-04 Thread Matthias Pohl
Yes, thanks for managing the release, Dawid & Guowei! +1

On Tue, May 4, 2021 at 4:20 PM Etienne Chauchot 
wrote:

> Congrats to everyone involved !
>
> Best
>
> Etienne
> On 03/05/2021 15:38, Dawid Wysakowicz wrote:
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.13.0.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2021/05/03/release-1.13.0.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349287
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Guowei & Dawid
>
>


Re: java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.R

2021-05-04 Thread Matthias Pohl
In Flink 1.11, there were some changes how the Flink clients dependency is
bundled in [1]. The error you're seeing is likely due to the flink-clients
module not being on the classpath anymore. Can you check your dependencies
and update the pom.xml as suggested in [1]?

Matthias

[1] https://flink.apache.org/news/2020/12/18/release-1.11.3.html

On Tue, May 4, 2021 at 1:00 PM Ragini Manjaiah 
wrote:

>
> As you suggested I downloaded  flink 1.11.3
> to submit a flink job  . The actual application is developed in flink
> 1.8.1.
> Since the Hadoop cluster is 3.2.0 apache I downloaded flink 1.11.3 (
> flink-1.11.3-bin-scala_2.11.tgz) and tried  to submit the job.
> while submitting facing the below mentioned  exception . I have set the
> HADOOP parameters :
>
>
> export HADOOP_CONF_DIR=/etc/hadoop/conf
>
> export HADOOP_CLASSPATH=`hadoop classpath`
>
>
> Is there any changes I need to do it the pom file to overcome this
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: No ExecutorFactory found to execute the application.
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
>
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>
> Caused by: java.lang.IllegalStateException: No ExecutorFactory found to
> execute the application.
>
> at
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1809)
>
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)
>
> at org.sapphire.appspayload.StreamingJob.main(StreamingJob.java:214)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>
> On Tue, May 4, 2021 at 11:47 AM Ragini Manjaiah 
> wrote:
>
>> Thank you for the clarification.
>>
>> On Mon, May 3, 2021 at 6:57 PM Matthias Pohl 
>> wrote:
>>
>>> Hi Ragini,
>>> this is a dependency version issue. Flink 1.8.x does not support Hadoop
>>> 3, yet. The support for Apache Hadoop 3.x was added in Flink 1.11 [1]
>>> through FLINK-11086 [2]. You would need to upgrade to a more recent Flink
>>> version.
>>>
>>> Best,
>>> Matthias
>>>
>>> [1]
>>> https://flink.apache.org/news/2020/07/06/release-1.11.0.html#important-changes
>>> [2] https://issues.apache.org/jira/browse/FLINK-11086
>>>
>>> On Mon, May 3, 2021 at 3:05 PM Ragini Manjaiah <
>>> ragini.manja...@gmail.com> wrote:
>>>
 Hi Team,
 I have Flink 1.8.1 and  hadoop open source 3.2.0 . My flink jobs run
 without issues on HDP 2.5.3 version. when run on hadoop open source 3.2.0
 encountering the below mentioned exception .
 I have set hadoop
 export HADOOP_CONF_DIR=/etc/hadoop/conf
 export HADOOP_CLASSPATH=`hadoop classpath`


 SLF4J: Class path contains multiple SLF4J bindings.

 SLF4J: Found binding in
 [jar:file:/home_dir/svsap61/flink-1.8.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]

 SLF4J: Found binding in
 [jar:file:/usr/share/hadoop-tgt-3.2.0.1.0.0.11/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.

 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

 java.lang.IllegalAccessError: tried to access method
 org.apache.hadoop.yarn.clien

Re: Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread David Anderson
Could you describe a situation in which hand-tuning the parallelism of
individual operators produces significantly better throughput than the
default approach? I think it would help this discussion if we could have a
specific use case in mind where this is clearly better.

Regards,
David

On Tue, May 4, 2021 at 12:29 PM vishalovercome  wrote:

> Forgot to add one more question - 7. If maxParallelism needs to be set to
> control parallelism, then wouldn't that mean that we wouldn't ever be able
> to take a savepoint and rescale beyond the configured maxParallelism? This
> would mean that we can never achieve hand tuned resource efficient. I will
> need to set maxParallelism beyond the current parallelism and given current
> tendency to allocate same number of sub-tasks for each operator, I will
> inevitably end up with several under utilized operators.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [ANNOUNCE] Apache Flink 1.12.3 released

2021-05-04 Thread Piotr Nowojski
Yes, thanks a lot for driving this release Arvid :)

Piotrek

czw., 29 kwi 2021 o 19:04 Till Rohrmann  napisał(a):

> Great to hear. Thanks a lot for being our release manager Arvid and to
> everyone who has contributed to this release!
>
> Cheers,
> Till
>
> On Thu, Apr 29, 2021 at 4:11 PM Arvid Heise  wrote:
>
>> Dear all,
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.12.3, which is the third bugfix release for the Apache Flink
>> 1.12 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2021/04/29/release-1.12.3.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349691
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>>
>> Your friendly release manager Arvid
>>
>


Re: Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread vishalovercome
Forgot to add one more question - 7. If maxParallelism needs to be set to
control parallelism, then wouldn't that mean that we wouldn't ever be able
to take a savepoint and rescale beyond the configured maxParallelism? This
would mean that we can never achieve hand tuned resource efficient. I will
need to set maxParallelism beyond the current parallelism and given current
tendency to allocate same number of sub-tasks for each operator, I will
inevitably end up with several under utilized operators.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-04 Thread Etienne Chauchot

Congrats to everyone involved !

Best

Etienne

On 03/05/2021 15:38, Dawid Wysakowicz wrote:
|The Apache Flink community is very happy to announce the release of 
Apache Flink 1.13.0.|
|Apache Flink® is an open-source stream processing framework for 
distributed, high-performing, always-available, and accurate data 
streaming applications.|

|The release is available for download at:|
|https://flink.apache.org/downloads.html 
|
|Please check out the release blog post for an overview of the 
improvements for this bugfix release:|
|https://flink.apache.org/news/2021/05/03/release-1.13.0.html| 


|The full release notes are available in Jira:|
|https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349287 
|
|We would like to thank all contributors of the Apache Flink community 
who made this release possible!|

|Regards,|
|Guowei & Dawid
|


Re: Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread vishalovercome
Some questions about adaptive scheduling documentation - "If new slots become
available the job will be scaled up again, up to the configured
parallelism". 

Does parallelism refer to maxParallelism or parallelism? I'm guessing its
the latter because the doc later mentions - "In Reactive Mode (see above)
the configured parallelism is ignored and treated as if it was set to
infinity, letting the job always use as many resources as possible." 

The other question I have is along the same lines as that mentioned earlier
- what strategy is used to allocate sub-tasks? Is it the ratio of
parallelism that's configured or does it try to achieve as much operator
chaining as possible



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread vishalovercome
Thank you very much for the new release that makes auto-scaling possible. I'm
currently running multiple flink jobs and I've hand tuned the parallelism of
each of the operators to achieve the best throughput. I would much rather
use the auto-scaling capabilities of flink than have to hand tune my jobs
but it seems there are a few gaps:

1. Setting max parallelism seems to be the only user controlled knob at the
moment. As flink tries to achieve operator chaining by launching same number
of sub-tasks for each operator, I'm afraid the current auto-scaling will be
very inefficient. At a minimum, we need to support user provided ratios that
will be used to distribute sub-tasks among operators. E.g. O1:O2 = 4:1 will
mean that 4 sub-tasks of O1 should be started for each sub-task of O2. 

2. Allow for external system to set parallelism of operators. Perhaps job
manager's rest api can be extended to support such scaling requests

3. The doc says that local recovery doesn't work. This makes sense when a
restart is due to a scaling action but I couldn't quite understand why that
needs to be the case when a task manager is recovering from a crash

4. Is there any metric that allows us to distinguish between restart due to
scaling as opposed to restart due to some other reason? Based on the section
on limitations, there isn't but it would be good to add this as people will
eventually want to monitor and alert based on restarts due to failures
alone.

5. Suppose the number of containers are fixed and the job is running. Will
flink internally rebalance by adding sub-tasks of one operator and removing
sub-tasks of another? This could be driven by back-pressure for instance.
The doc doesn't mention this so I'm assuming that current scaling is
designed to maximize operator chaining. However, it does make sense to
incorporate back-pressure to rebalance. Should this be how future versions
of auto-scaling will work then we'll need to have some toggles to avoid
restart loops.

6. How is the implementation different from taking a savepoint and manually
rescaling? Are there any operator specific gotchas that we should watch out
for? For instance, we use AsyncIO operator and wanted to know how inflight
requests to a database would be handled when it's parallelism changes.

Once again, thank you for your continued support!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Setup of Scala/Flink project using Bazel

2021-05-04 Thread Austin Cawley-Edwards
Great! Feel free to post back if you run into anything else or come up with
a nice template – I agree it would be a nice thing for the community to
have.

Best,
Austin

On Tue, May 4, 2021 at 12:37 AM Salva Alcántara 
wrote:

> Hey Austin,
>
> There was no special reason for vendoring using `bazel-deps`, really. I
> just
> took another project as a reference for mine and that project was already
> using `bazel-deps`. I am going to give `rules_jvm_external` a try, and
> hopefully I can make it work!
>
> Regards,
>
> Salva
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Interacting with flink-jobmanager via CLI in separate pod

2021-05-04 Thread Robert Cullen
I have a flink cluster running in kubernetes, just the basic installation
with one JobManager and two TaskManagers. I want to interact with it via
command line from a separate container ie:

root@flink-client:/opt/flink# ./bin/flink list --target
kubernetes-application -Dkubernetes.cluster-id=job-manager

How do you interact in the same kubernetes instance via CLI (Not from the
desktop)?  This is the exception:


 The program finished with the following exception:

java.lang.RuntimeException:
org.apache.flink.client.deployment.ClusterRetrieveException: Could not
get the rest endpoint of job-manager
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:103)
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:145)
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:67)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1001)
at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:427)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.flink.client.deployment.ClusterRetrieveException:
Could not get the rest endpoint of job-manager
... 9 more
root@flink-client:/opt/flink#

-- 
Robert Cullen
240-475-4490


Presence of Jars in Flink reg security

2021-05-04 Thread Prasanna kumar
Hi Flinksters,

Our repo which is a maven based java project(flink) went through SCA
scan using WhiteSource tool and following are the HIGH severity issues
reported. The target vulnerable jar is not found when we build the
dependency tree of the project.

Could any one let us know if flink uses these anywhere.

+--++
| Library  | Severity   |
+==++
| xercesImpl-2.9.1.jar | HIGH   |
+--++
- Artifact ID: xercesImpl
- Group ID: xerces
- Library Version: 2.9.1
- Library Path:
/var/lib/jenkins/workspace/branch/latest/?/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar
- Dependency: None
- Type: MAVEN_ARTIFACT
- Description: XMLscanner.java in Apache Xerces2 Java Parser before
2.12.0, as used in the Java Runtime Environment (JRE) in IBM Java 5.0
before 5.0 SR16-FP3, 6 before 6 SR14, 6.0.1 before 6.0.1 SR6, and 7
before 7 SR5 as well as Oracle Java SE 7u40 and earlier, Java SE 6u60
and earlier, Java SE 5.0u51 and earlier, JRockit R28.2.8 and earlier,
JRockit R27.7.6 and earlier, Java SE Embedded 7u40 and earlier, and
possibly other products allows remote attackers to cause a denial of
service via vectors related to XML attribute names.
- Suggested Fix: Upgrade to version xerces:xercesImpl:Xerces-J_2_12_0


+---++
| Library   | Severity   |
+===++
| struts-core-1.3.8.jar | HIGH   |
+---++
- Artifact ID: struts-core
- Group ID: org.apache.struts
- Library Version: 1.3.8
- Library Path:
/var/lib/jenkins/workspace/branchlatest/?/.m2/repository/org/apache/struts/struts-core/1.3.8/struts-core-1.3.8.jar
- Dependency: None
- Type: MAVEN_ARTIFACT
- Description: ActionServlet.java in Apache Struts 1 1.x through
1.3.10 does not properly restrict the Validator configuration, which
allows remote attackers to conduct cross-site scripting (XSS) attacks
or cause a denial of service via crafted input, a related issue to
CVE-2015-0899.
- Suggested Fix: Replace or update the following file: 
ActionServlet.java

+--++
| Library  | Severity   |
+==++
| plexus-utils-3.0.jar | HIGH   |
+--++
- Artifact ID: plexus-utils
- Group ID: org.codehaus.plexus
- Library Version: 3.0
- Library Path:
/var/lib/jenkins/workspace/branchlatest/?/.m2/repository/org/codehaus/plexus/plexus-utils/3.0/plexus-utils-3.0.jar
- Dependency: None
- Type: MAVEN_ARTIFACT
- Description: Security vulnerability found in plexus-utils before
3.0.24. XML injection found in XmlWriterUtil.java.
- Suggested Fix: Upgrade to version 3.0.24

Thanks,

Prasanna.


Re: Guaranteeing event order in a KeyedProcessFunction

2021-05-04 Thread Miguel Araújo
Hi Timo,

Thanks for your answer. I think I wasn't clear enough in my initial
message, so let me give more details.

The stream is not keyed by timestamp, it's keyed by a custom field (e.g.,
user-id) and then fed into a KeyedProcessFunction. I want to process all
events for a given user in order, before sending them downstream for
further processing in other operators. I don't want to hold events longer
than needed, hence using the watermark to signal which events can be
processed.
I don't think your suggestion of using a ListState would work, because we
would effectively have one list per user. That would imply (among other
things) that an event could only be processed when a new event for the same
user arrives, which would not only imply a (potentially) huge latency, but
also data leakage. Not to mention that the events being sent could easily
be considered late-events to the downstream operators.
The idea of keying by timestamp was an "evolution" of the ListState
suggestion, where events-to-be-later-processed would be kept sorted in the
map (which is what would be keyed by timestamp). We could iterate the map
to process the events, instead of fetching the full list and sorting it to
process the events in order. I don't think this solves any of the problems
mentioned above, so I think that mentioning it only raised confusion.

Regarding global event-time order, that's not really what I'm after. I only
need event-time order per key, but I want to process the event as soon as
possible, constrained by knowing that it is "safe" to do so because no
event with a smaller timestamp for this key is yet to come.

So, rephrasing my question as I'm not sure that part was clear in the
initial message, here is the idea:
- keeping one priority queue (ordered by timestamp) in each
KeyedProcessFunction instance. Therefore, each priority queue would store
events for multiple keys.
- when an event arrives, we push it to the queue and then process events
(updating state and sending them downstream) while their timestamp is lower
than the current watermark.

The question is:
- is this fault tolerant? The priority queue is not state that is managed
by flink, but it should be recoverable on replay.
- is it possible that the events I'm sending downstream become late-events
for a different operator, for some reason? Will they always be sent before
the watermark of the event that originated the processElement() call?
- I would effectively be processing multiple elements (from multiple keys)
in the same call to processElement(). Is there a way to access the state of
different keys?

This doesn't feel like the right approach. Is there an operator more
suitable than a KeyedProcessFunction which would allow me to handle the
state for multiple keys in this task manager? Should I register a timer to
trigger on the event timestamp instead? I believe timers trigger on
watermarks, so that could theoretically work, although it feels a little
weird. After all, what I want is just to buffer events so that they are
only processed when the watermark has caught up to them.

Thanks

Timo Walther  escreveu no dia sexta, 30/04/2021 à(s)
12:05:

> Hi Miguel,
>
> your initial idea sounds not too bad but why do you want to key by
> timestamp? Usually, you can simply key your stream by a custom key and
> store the events in a ListState until a watermark comes in.
>
> But if you really want to have some kind of global event-time order, you
> have two choices:
>
> - either a single operator with parallelism 1 that performs the ordering
> - or you send the every event to every operator using the broadcast
> state pattern [1]
>
> It is guaranteed that watermark will reach the downstream operator or
> sink after all events. Watermarks are synchronized across all parallel
> operator instances. You can store a Map uncheckpointed by this means
> that you have to ensure to initialize the map again during recovery.
>
> Regards,
> Timo
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>
> On 30.04.21 11:37, Miguel Araújo wrote:
> > Hi everyone,
> >
> > I have a KeyedProcessFunction whose events I would like to process in
> > event-time order.
> > My initial idea was to use a Map keyed by timestamp and, when a new
> > event arrives, iterate over the Map to process events older than the
> > current watermark.
> >
> > The issue is that I obviously can't use a MapState, as my stream is
> > keyed, so the map would be scoped to the current key.
> > Is using a "regular" (i.e., not checkpointed) Map an option, given that
> > its content will be recreated by the replay of the events on a restart?
> > Is it guaranteed that the watermark that triggered the processing of
> > multiple events (and their subsequent push downstream) is not sent
> > downstream before these events themselves?
> >
> > Thanks,
> > Miguel
>
>


Re: java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.R

2021-05-04 Thread Ragini Manjaiah
As you suggested I downloaded  flink 1.11.3
to submit a flink job  . The actual application is developed in flink 1.8.1.
Since the Hadoop cluster is 3.2.0 apache I downloaded flink 1.11.3 (
flink-1.11.3-bin-scala_2.11.tgz) and tried  to submit the job.
while submitting facing the below mentioned  exception . I have set the
HADOOP parameters :


export HADOOP_CONF_DIR=/etc/hadoop/conf

export HADOOP_CLASSPATH=`hadoop classpath`


Is there any changes I need to do it the pom file to overcome this


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: No ExecutorFactory found to execute the application.

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)

at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

Caused by: java.lang.IllegalStateException: No ExecutorFactory found to
execute the application.

at
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1809)

at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)

at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)

at org.sapphire.appspayload.StreamingJob.main(StreamingJob.java:214)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)

On Tue, May 4, 2021 at 11:47 AM Ragini Manjaiah 
wrote:

> Thank you for the clarification.
>
> On Mon, May 3, 2021 at 6:57 PM Matthias Pohl 
> wrote:
>
>> Hi Ragini,
>> this is a dependency version issue. Flink 1.8.x does not support Hadoop
>> 3, yet. The support for Apache Hadoop 3.x was added in Flink 1.11 [1]
>> through FLINK-11086 [2]. You would need to upgrade to a more recent Flink
>> version.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://flink.apache.org/news/2020/07/06/release-1.11.0.html#important-changes
>> [2] https://issues.apache.org/jira/browse/FLINK-11086
>>
>> On Mon, May 3, 2021 at 3:05 PM Ragini Manjaiah 
>> wrote:
>>
>>> Hi Team,
>>> I have Flink 1.8.1 and  hadoop open source 3.2.0 . My flink jobs run
>>> without issues on HDP 2.5.3 version. when run on hadoop open source 3.2.0
>>> encountering the below mentioned exception .
>>> I have set hadoop
>>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>> export HADOOP_CLASSPATH=`hadoop classpath`
>>>
>>>
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>
>>> SLF4J: Found binding in
>>> [jar:file:/home_dir/svsap61/flink-1.8.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>
>>> SLF4J: Found binding in
>>> [jar:file:/usr/share/hadoop-tgt-3.2.0.1.0.0.11/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>>
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>>
>>> java.lang.IllegalAccessError: tried to access method
>>> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
>>> from class
>>> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
>>>
>>> at
>>> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
>>>
>>> at
>>> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:188)
>>>
>>> at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:118)
>>>
>>> at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:93)
>>>
>>> at
>>> org.apache.hadoop.yarn.client.ClientRMProxy.cr

Flink : Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.

2021-05-04 Thread Ragini Manjaiah
Hi Team,
I am trying to submit a flink job of version 1.11.3 . The actual
application is developed in flink 1.8.1.
Since the Hadoop cluster is 3.2.0 apache I downloaded flink 1.11.3 (
flink-1.11.3-bin-scala_2.11.tgz) and tried  to submit the job.
while submitting facing the below mentioned  exception . I have set the
HADOOP parameters :


export HADOOP_CONF_DIR=/etc/hadoop/conf

export HADOOP_CLASSPATH=`hadoop classpath`


Is there any changes I need to do it the pom file to overcome this


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: No ExecutorFactory found to execute the application.

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)

at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

Caused by: java.lang.IllegalStateException: No ExecutorFactory found to
execute the application.

at
org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1809)

at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)

at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700)

at org.sapphire.appspayload.StreamingJob.main(StreamingJob.java:214)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-04 Thread Robert Metzger
Thanks a lot to everybody who has contributed to the release, in particular
the release managers for running the show!


On Tue, May 4, 2021 at 8:54 AM Konstantin Knauf 
wrote:

> Thank you Dawid and Guowei! Great job everyone :)
>
> On Mon, May 3, 2021 at 7:11 PM Till Rohrmann  wrote:
>
>> This is great news. Thanks a lot for being our release managers Dawid and
>> Guowei! And also thanks to everyone who has made this release possible :-)
>>
>> Cheers,
>> Till
>>
>> On Mon, May 3, 2021 at 5:46 PM vishalovercome 
>> wrote:
>>
>>> This is a very big release! Many thanks to the flink developers for their
>>> contributions to making Flink as good a framework that it is!
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>
>
> --
>
> Konstantin Knauf | Head of Product
>
> +49 160 91394525
>
>
> Follow us @VervericaData Ververica 
>
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>


Define rowtime on intermediate table field

2021-05-04 Thread Sumeet Malhotra
Hi,

My use case involves reading raw data records from Kafka and processing
them. The records are coming from a database, where a periodic job reads
new rows, packages them into a single JSON object (as described below) and
writes the entire record to Kafka.

{
'id': 'some_id',
'key_a': 'value_a',
'key_b': 'value_b',
'result': {
'columns': [
'col_a',
'col_b',
'col_c',
'col_d'
],
'rows': [
['2021-05-04T05:23:13.953610Z', '655361', '8013', '0'],
['2021-05-04T05:23:13.953610Z', '655362', '4000', '456'],
['2021-05-04T05:23:13.953610Z', '655363', '2', '562'],
...
...
]
}
}

As can be seen, the row time is actually embedded in the `result` object.

What I'm doing at the moment is to run this data through a user defined
table function, which parses the `result` object as a string, and emits
multiple rows that include the timestamp field. This is working fine.

In the next step, I would want to perform windowing on this transformed
data. That requires defining the event time attribute along with the
watermark. As I understand, this can be done either during the initial
table DDL definition or during conversion to a datastream.

Since I extract the timestamp value only after reading from Kafka, how can
I define an event time attribute on the intermediate table that's basically
a result of the user defined table function?

The only solution I can think of at the moment, is to write the
intermediate table back to Kafka, and then create a new consumer that reads
from Kafka, where I can define the event time attribute as part of its DDL.
This most likely won't be good performance wise. I'm looking at any other
way, I can define event time on results of my user defined table function?

Thanks in advance,
Sumeet