Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-03 Thread Konstantin Knauf
Thank you Dawid and Guowei! Great job everyone :)

On Mon, May 3, 2021 at 7:11 PM Till Rohrmann  wrote:

> This is great news. Thanks a lot for being our release managers Dawid and
> Guowei! And also thanks to everyone who has made this release possible :-)
>
> Cheers,
> Till
>
> On Mon, May 3, 2021 at 5:46 PM vishalovercome  wrote:
>
>> This is a very big release! Many thanks to the flink developers for their
>> contributions to making Flink as good a framework that it is!
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

-- 

Konstantin Knauf | Head of Product

+49 160 91394525


Follow us @VervericaData Ververica 


--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


Re: Interactive Programming in Flink (Cache operation)

2021-05-03 Thread Matthias Pohl
It is planned for some future release as the corresponding vote [1]
succeeded. I guess the effort is stalled for some time, unfortunately. I'm
gonna add Timo to this conversation. He might have more insights on the
plans to proceed with it.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/RESULT-VOTE-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td45024.html

On Tue, May 4, 2021 at 8:12 AM Jack Kolokasis 
wrote:

> Hi Matthias,
>
> Thank you for your reply. Are you going to include it in future versions?
>
> Best,
> Iacovos
> On 4/5/21 9:10 π.μ., Matthias Pohl wrote:
>
> Hi Iacovos,
> unfortunately, it doesn't as the related FLINK-19343 [1] is not resolved,
> yet. The release notes for Flink 1.13 can be found in [2].
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-19343
> [2] https://flink.apache.org/news/2021/05/03/release-1.13.0.html
>
> On Mon, May 3, 2021 at 8:12 PM Jack Kolokasis 
> wrote:
>
>> Hello,
>>
>> Does the new release of Flink 1.13.0 includes the cache operation
>> feature
>> (
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>> ).
>>
>> Thank you,
>> Iacovos
>
>


Re: java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.R

2021-05-03 Thread Ragini Manjaiah
Thank you for the clarification.

On Mon, May 3, 2021 at 6:57 PM Matthias Pohl  wrote:

> Hi Ragini,
> this is a dependency version issue. Flink 1.8.x does not support Hadoop 3,
> yet. The support for Apache Hadoop 3.x was added in Flink 1.11 [1] through
> FLINK-11086 [2]. You would need to upgrade to a more recent Flink version.
>
> Best,
> Matthias
>
> [1]
> https://flink.apache.org/news/2020/07/06/release-1.11.0.html#important-changes
> [2] https://issues.apache.org/jira/browse/FLINK-11086
>
> On Mon, May 3, 2021 at 3:05 PM Ragini Manjaiah 
> wrote:
>
>> Hi Team,
>> I have Flink 1.8.1 and  hadoop open source 3.2.0 . My flink jobs run
>> without issues on HDP 2.5.3 version. when run on hadoop open source 3.2.0
>> encountering the below mentioned exception .
>> I have set hadoop
>> export HADOOP_CONF_DIR=/etc/hadoop/conf
>> export HADOOP_CLASSPATH=`hadoop classpath`
>>
>>
>> SLF4J: Class path contains multiple SLF4J bindings.
>>
>> SLF4J: Found binding in
>> [jar:file:/home_dir/svsap61/flink-1.8.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: Found binding in
>> [jar:file:/usr/share/hadoop-tgt-3.2.0.1.0.0.11/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>>
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>
>> java.lang.IllegalAccessError: tried to access method
>> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
>> from class
>> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
>>
>> at
>> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
>>
>> at
>> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:188)
>>
>> at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:118)
>>
>> at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:93)
>>
>> at
>> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
>>
>> at
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:195)
>>
>> at
>> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
>>
>> at
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.getClusterDescriptor(FlinkYarnSessionCli.java:1013)
>>
>> at
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createDescriptor(FlinkYarnSessionCli.java:274)
>>
>> at
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:454)
>>
>> at
>> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:97)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:224)
>>
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>>
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>
>


Re: Interactive Programming in Flink (Cache operation)

2021-05-03 Thread Jack Kolokasis

Hi Matthias,

Thank you for your reply. Are you going to include it in future versions?

Best,
Iacovos

On 4/5/21 9:10 π.μ., Matthias Pohl wrote:

Hi Iacovos,
unfortunately, it doesn't as the related FLINK-19343 [1] is not 
resolved, yet. The release notes for Flink 1.13 can be found in [2].


Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-19343 

[2] https://flink.apache.org/news/2021/05/03/release-1.13.0.html 



On Mon, May 3, 2021 at 8:12 PM Jack Kolokasis > wrote:


Hello,

Does the new release of Flink 1.13.0 includes the cache operation
feature

(https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink

).

Thank you,
Iacovos



Re: Interactive Programming in Flink (Cache operation)

2021-05-03 Thread Matthias Pohl
Hi Iacovos,
unfortunately, it doesn't as the related FLINK-19343 [1] is not resolved,
yet. The release notes for Flink 1.13 can be found in [2].

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-19343
[2] https://flink.apache.org/news/2021/05/03/release-1.13.0.html

On Mon, May 3, 2021 at 8:12 PM Jack Kolokasis 
wrote:

> Hello,
>
> Does the new release of Flink 1.13.0 includes the cache operation
> feature
> (
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> ).
>
> Thank you,
> Iacovos
>


Re: savepoint command in code

2021-05-03 Thread Matthias Pohl
Hi Abdullah,
is there a reason you're not considering triggering the stop-with-savepoint
operation through the REST API [1]? I'm not entirely sure whether I
understand you correctly: ./bin/flink is an executable. Why Would you
assume it to be shown as a directory? You would need to provide FLINK_HOME
(the Flink's binary directory ./bin/flink is located in) through some
parameter to access the executable.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-stop

On Tue, May 4, 2021 at 5:51 AM Abdullah bin Omar <
abdullahbinoma...@gmail.com> wrote:

> Hello,
>
> I am trying to use the savepoint command (./bin/flink savepoint jobid) in
> the code instead of doing it manually in the terminal. The jobid can get
> using getjobid(). The problem is to define the path ./bin/flink  —  it
> can not be shown as a directory (probably because of a unix executable
> file).
>
> Is there a way to define the path (./bin/flink) in the code? or, is there
> any function to get the savepoint from code instead of manual command?
>
> Thank you
>


Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-03 Thread Matthias Pohl
Hi Fuyao,
sorry for not replying earlier. The stop-with-savepoint operation shouldn't
only suspend but terminate the job. Is it that you might have a larger
state that makes creating the savepoint take longer? Even though,
considering that you don't experience this behavior with your 2nd solution,
I'd assume that we could ignore this possibility.

I'm gonna add Austin to the conversation as he worked with k8s operators as
well already. Maybe, he can also give you more insights on the logging
issue which would enable us to dig deeper into what's going on with
stop-with-savepoint.

Best,
Matthias

On Tue, May 4, 2021 at 4:33 AM Fuyao Li  wrote:

> Hello,
>
>
>
> Update:
>
> I think stopWithSavepoint() only suspend the job. It doesn’t actually
> terminate (./bin/flink cancel) the job. I switched to cancelWithSavepoint()
> and it works here.
>
>
>
> Maybe stopWithSavepoint() should only be used to update the configurations
> like parallelism? For updating the image, this seems to be not suitable,
> please correct me if I am wrong.
>
>
>
> For the log issue, I am still a bit confused. Why it is not available in
> kubectl logs. How should I get access to it?
>
>
>
> Thanks.
>
> Best,
>
> Fuyao
>
>
>
> *From: *Fuyao Li 
> *Date: *Sunday, May 2, 2021 at 00:36
> *To: *user , Yang Wang 
> *Subject: *[External] : Re: StopWithSavepoint() method doesn't work in
> Java based flink native k8s operator
>
> Hello,
>
>
>
> I noticed that first trigger a savepoint and then delete the deployment
> might cause the duplicate data issue. That could pose a bad influence to
> the semantic correctness. Please give me some hints on how to make the
> stopWithSavepoint() work correctly with Fabric8io Java k8s client to
> perform this image update operation. Thanks!
>
>
>
> Best,
>
> Fuyao
>
>
>
>
>
>
>
> *From: *Fuyao Li 
> *Date: *Friday, April 30, 2021 at 18:03
> *To: *user , Yang Wang 
> *Subject: *[External] : Re: StopWithSavepoint() method doesn't work in
> Java based flink native k8s operator
>
> Hello Community, Yang,
>
>
>
> I have one more question for logging. I also noticed that if I execute
> kubectl logs  command to the JM. The pods provisioned by the operator can’t
> print out the internal Flink logs in the kubectl logs. I can only get
> something like the logs below. No actual flink logs is printed here… Where
> can I find the path to the logs? Maybe use a sidecar container to get it
> out? How can I get the logs without checking the Flink WebUI? Also, the sed
> error makes me confused here. In fact, the application is already up and
> running correctly if I access the WebUI through Ingress.
>
>
>
> Reference:
> https://github.com/wangyang0918/flink-native-k8s-operator/issues/4
> 
>
>
>
>
>
> [root@bastion deploy]# kubectl logs -f flink-demo-594946fd7b-822xk
>
>
>
> sed: couldn't open temporary file /opt/flink/conf/sedh1M3oO: Read-only
> file system
>
> sed: couldn't open temporary file /opt/flink/conf/sed8TqlNR: Read-only
> file system
>
> /docker-entrypoint.sh: line 75: /opt/flink/conf/flink-conf.yaml: Read-only
> file system
>
> sed: couldn't open temporary file /opt/flink/conf/sedvO2DFU: Read-only
> file system
>
> /docker-entrypoint.sh: line 88: /opt/flink/conf/flink-conf.yaml: Read-only
> file system
>
> /docker-entrypoint.sh: line 90: /opt/flink/conf/flink-conf.yaml.tmp:
> Read-only file system
>
> Start command: $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH
> -Xmx3462817376 -Xms3462817376 -XX:MaxMetaspaceSize=268435456
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
> -D jobmanager.memory.off-heap.size=134217728b -D
> jobmanager.memory.jvm-overhead.min=429496736b -D
> jobmanager.memory.jvm-metaspace.size=268435456b -D
> jobmanager.memory.heap.size=3462817376b -D
> jobmanager.memory.jvm-overhead.max=429496736b
>
> ERROR StatusLogger No Log4j 2 configuration file found. Using default
> configuration (logging only errors to the console), or user
> programmatically provided configurations. Set system property
> 'log4j2.debug' to show Log4j 2 internal initialization logging. See
> https://logging.apache.org/log4j/2.x/manual/configuration.html
> 
> for instructions on how to configure Log4j 2
>
> WARNING: An illegal reflective access operation has occurred
>
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/opt/flink/lib/flink-dist_2.11-1.12.1.jar) to field
> java.util.Properties.serialVersionUID
>
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
>
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access o

Re: Setup of Scala/Flink project using Bazel

2021-05-03 Thread Salva Alcántara
Hey Austin,

There was no special reason for vendoring using `bazel-deps`, really. I just
took another project as a reference for mine and that project was already
using `bazel-deps`. I am going to give `rules_jvm_external` a try, and
hopefully I can make it work!

Regards,

Salva



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


savepoint command in code

2021-05-03 Thread Abdullah bin Omar
Hello,

I am trying to use the savepoint command (./bin/flink savepoint jobid) in
the code instead of doing it manually in the terminal. The jobid can get
using getjobid(). The problem is to define the path ./bin/flink  —  it can
not be shown as a directory (probably because of a unix executable file).

Is there a way to define the path (./bin/flink) in the code? or, is there
any function to get the savepoint from code instead of manual command?

Thank you


org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: IOException: 1 time,

2021-05-03 Thread Ragini Manjaiah
Hi ,
One of my flink applications needs to get and put records from HBASE for
every event while processing in real time . When there are less events the
application process without any issues. when the number of events
increases we start hitting with the below mentioned exception .Can these
exceptions bring down the throughput and start to build lag . What are the
parameters we can tune at HBASE /flink side to overcome this exception . We
are seeing 7000/sec hits as minimum hits to HBase when load is normal. The
hbase table 3 region server


org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException:
Failed 1 action: IOException: 1 time,
at 
org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:258)
at 
org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$2000(AsyncProcess.java:238)
at 
org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1817)
at 
org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:240)
at 
org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:190)
at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1434)
at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1018)
at org......xx(xxx.java:202)
at 
org......xxx.xxx(xxx.java:144)
at 
org......xxx.(x.java:30)
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)


Re: StopWithSavepoint() method doesn't work in Java based flink native k8s operator

2021-05-03 Thread Fuyao Li
Hello,

Update:
I think stopWithSavepoint() only suspend the job. It doesn’t actually terminate 
(./bin/flink cancel) the job. I switched to cancelWithSavepoint() and it works 
here.

Maybe stopWithSavepoint() should only be used to update the configurations like 
parallelism? For updating the image, this seems to be not suitable, please 
correct me if I am wrong.

For the log issue, I am still a bit confused. Why it is not available in 
kubectl logs. How should I get access to it?

Thanks.
Best,
Fuyao

From: Fuyao Li 
Date: Sunday, May 2, 2021 at 00:36
To: user , Yang Wang 
Subject: [External] : Re: StopWithSavepoint() method doesn't work in Java based 
flink native k8s operator
Hello,

I noticed that first trigger a savepoint and then delete the deployment might 
cause the duplicate data issue. That could pose a bad influence to the semantic 
correctness. Please give me some hints on how to make the stopWithSavepoint() 
work correctly with Fabric8io Java k8s client to perform this image update 
operation. Thanks!

Best,
Fuyao



From: Fuyao Li 
Date: Friday, April 30, 2021 at 18:03
To: user , Yang Wang 
Subject: [External] : Re: StopWithSavepoint() method doesn't work in Java based 
flink native k8s operator
Hello Community, Yang,

I have one more question for logging. I also noticed that if I execute kubectl 
logs  command to the JM. The pods provisioned by the operator can’t print out 
the internal Flink logs in the kubectl logs. I can only get something like the 
logs below. No actual flink logs is printed here… Where can I find the path to 
the logs? Maybe use a sidecar container to get it out? How can I get the logs 
without checking the Flink WebUI? Also, the sed error makes me confused here. 
In fact, the application is already up and running correctly if I access the 
WebUI through Ingress.

Reference: 
https://github.com/wangyang0918/flink-native-k8s-operator/issues/4


[root@bastion deploy]# kubectl logs -f flink-demo-594946fd7b-822xk

sed: couldn't open temporary file /opt/flink/conf/sedh1M3oO: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sed8TqlNR: Read-only file 
system
/docker-entrypoint.sh: line 75: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedvO2DFU: Read-only file 
system
/docker-entrypoint.sh: line 88: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
/docker-entrypoint.sh: line 90: /opt/flink/conf/flink-conf.yaml.tmp: Read-only 
file system
Start command: $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx3462817376 
-Xms3462817376 -XX:MaxMetaspaceSize=268435456 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint 
-D jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=429496736b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=3462817376b -D 
jobmanager.memory.jvm-overhead.max=429496736b
ERROR StatusLogger No Log4j 2 configuration file found. Using default 
configuration (logging only errors to the console), or user programmatically 
provided configurations. Set system property 'log4j2.debug' to show Log4j 2 
internal initialization logging. See 
https://logging.apache.org/log4j/2.x/manual/configuration.html
 for instructions on how to configure Log4j 2
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner 
(file:/opt/flink/lib/flink-dist_2.11-1.12.1.jar) to field 
java.util.Properties.serialVersionUID
WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release


 The logs stops here, flink applications logs doesn’t get printed here 
anymore-

^C
[root@bastion deploy]# kubectl logs -f flink-demo-taskmanager-1-1
sed: couldn't open temporary file /opt/flink/conf/sedaNDoNR: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/seddze7tQ: Read-only file 
system
/docker-entrypoint.sh: line 75: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
sed: couldn't open temporary file /opt/flink/conf/sedYveZoT: Read-only file 
system
/docker-entrypoint.sh: line 88: /opt/flink/conf/flink-conf.yaml: Read-only file 
system
/docker-entrypoint.sh: line 90: /opt/flink/conf/flink-conf.yaml.tmp: Read-only 
file system
Start command: $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx697932173 
-Xms697932173 -XX:MaxDirectMemorySize=300

Zookeeper or Kubernetes for HA?

2021-05-03 Thread vishalovercome
Flink docs provide details on setting up HA but doesn't provide any
recommendations as such. For jobs running in kubernetes and having a
zookeeper deployment, which high availability option would be more
desirable? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-03 Thread vishalovercome
This is a very big release! Many thanks to the flink developers for their
contributions to making Flink as good a framework that it is!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Setup of Scala/Flink project using Bazel

2021-05-03 Thread Austin Cawley-Edwards
Hey Salva,

This appears to be a bug in the `bazel-deps` tool, caused by mixing scala
and Java dependencies. The tool seems to use the same target name for both,
and thus produces duplicate targets (one for scala and one for java).

If you look at the dict lines that are reported as conflicting, you'll see
the duplicate "vendor/org/apache/flink:flink_clients" target:

*"vendor/org/apache/flink:flink_clients":
["lang||java","name||//vendor/org/apache/flink:flink_clients",*
...],
*"vendor/org/apache/flink:flink_clients":
["lang||scala:2.12.11","name||//vendor/org/apache/flink:flink_clients",
*...],

Can I ask what made you choose the `bazel-deps` too instead of the official
bazelbuild/rules_jvm_external[1]? That might be a bit more verbose, but has
better support and supports scala as well.


Alternatively, you might look into customizing the target templates for
`bazel-deps` to suffix targets with the lang? Something like:

_JAVA_LIBRARY_TEMPLATE = """
java_library(
  name = "{name}_java",
..."""

_SCALA_IMPORT_TEMPLATE = """
scala_import(
name = "{name}_scala",
..."""


Best,
Austin

[1]: https://github.com/bazelbuild/rules_jvm_external

On Mon, May 3, 2021 at 1:20 PM Salva Alcántara 
wrote:

> Hi Matthias,
>
> Thanks a lot for your reply. I am already aware of that reference, but it's
> not exactly what I need. What I'd like to have is the typical word count
> (hello world) app migrated from sbt to bazel, in order to use it as a
> template for my Flink/Scala apps.
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink Version 1.11 job savepoint failures

2021-05-03 Thread Rainie Li
It helps.
Thanks Matthias.

Best regards
Rainie

On Mon, May 3, 2021 at 4:25 AM Matthias Pohl  wrote:

> Hi Rainie,
> the savepoint creation failed due to some tasks already being finished. It
> looks like you ran into an issue that was (partially as FLINK-21066 [1] is
> only a subtask of a bigger issue?) addressed in Flink 1.13 (see
> FLINK-21066). I'm pulling Yun Gao into this thread. Let's see whether Yun
> can confirm that finding.
>
> I hope that helps.
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-21066
>
> On Mon, May 3, 2021 at 9:07 AM Rainie Li  wrote:
>
>> Hi Flink Community,
>>
>> Our flink jobs are in version 1.11 and we use this to trigger savepoint.
>> $ bin/flink savepoint :jobId [:targetDirectory]
>> We can get trigger Id with savepoint path successfully.
>>
>> But we saw these errors by querying savepoint endpoint:
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html#jobs-jobid-savepoints-triggerid
>> e.g. application_id/jobs/job_id/savepoints/trigger_id
>>
>> {
>> *  "*errors*": *[
>> "org.apache.flink.runtime.rest.NotFoundException: Operation not
>> found under key:
>> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@8893e196\n\tat
>> org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers$StatusHandler.handleRequest(AbstractAsynchronousOperationHandlers.java:167)\n\tat
>> org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.handleRequest(SavepointHandlers.java:193)\n\tat
>> org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:73)\n\tat
>> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:178)\n\tat
>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:81)\n\tat
>> java.util.Optional.ifPresent(Optional.java:159)\n\tat
>> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:46)\n\tat
>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:78)\n\tat
>> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
>> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)\n\tat
>> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)\n\tat
>> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)\n\tat
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractCha

Interactive Programming in Flink (Cache operation)

2021-05-03 Thread Jack Kolokasis

Hello,

Does the new release of Flink 1.13.0 includes the cache operation 
feature 
(https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink).


Thank you,
Iacovos



Is keyed state supported in PyFlink?

2021-05-03 Thread Sumeet Malhotra
Hi,

Is keyed state [1] supported by PyFlink yet? I can see some code for it in
the Flink master branch, but there's no mention of it in the 1.12 Python
documentation.

Thanks,
Sumeet

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html


Re: Questions about implementing a flink source

2021-05-03 Thread Arvid Heise
Hi Evan,

1) You are absolutely correct that we would urge users to add new sources
as FLIP-27 and new sinks as FLIP-143. I can provide guidance in both cases.
For FLIP-27 sources, I'd recommend using KafkaSource [1] and FileSource [2]
as a starting point. Especially basing the reader implementation on
SingleThreadMultiplexSourceReaderBase will give you some performance boost
over naive implementations.
It is probably initially overwhelming but there is lots of thought behind
the Source interface. We plan on having better documentation and more
examples in the next months to ease the ramp up but it's also kind of a
hen-egg problem.

I can also provide guidance outside of the ML if it's easier.

2) You are right, the currentParallelism is static in respect to the
creation of the SourceReaders. Any change to the parallelism would also
cause a recreation of the readers.
Splits are usually checkpointed alongside the readers. On recovery, the
readers are restored with their old splits. Only when splits cannot be
recovered in the context of a reader (for example downscaling), the splits
would be re-added to the enumerator.

Rebalancing can happen in SplitEnumerator#addReader or #handleSplitRequest.
The Kafka and File source use even different approaches with eager and lazy
initialization respectively. Further, you can send arbitrary events between
the enumerator and readers to work out the rebalancing. In theory, you can
also dynamically rebalance splits, however, you lose ordering guarantees of
the messages at the moment (if you have records r1, r2 in this order in
split s and you reassign s, then you may end up with r2, r1 in the sink).

[1]
https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L75-L75
[2]
https://github.com/apache/flink/blob/bf40fa5b8cd5b3a8876f37ab4aa7bf06e8b9f666/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java#L99-L99

On Mon, May 3, 2021 at 1:40 AM Evan Palmer  wrote:

> Hello, I’m new to Flink. I’m trying to write a source for Pub/Sub Lite
> which is a partition based Pub/Sub product, and I have a few questions.
>
> 1.
>
> I saw that there are two sets of interfaces used in existing sources: The
> RichSourceFunction, and the set of interfaces from FLIP-27. It seems like
> the Source interfaces are preferred for new sources, but I wanted to be
> sure.
>
> 2.
>
> I’m having a little bit of trouble working out how when the
> currentParallelism returned by the SplitEnumeratorContext [1] can change,
> and how a source should react to that.
>
> For context, I’m currently thinking about single partitions as “splits”,
> so a source would have an approximately constant number of splits which
> each has an potentially unbounded amount of work (at least in continuous
> mode). Each split will be assigned to some SourceReader by the split
> enumerator. If the value of currentParallelism changes, it seems like I’ll
> need to find a way to redistribute my partitions over SourceReaders, or
> else I'll end up with an unbalanced distribution of partitions to
> SourceReaders.
>
> I looked at the docs on elastic scaling [2], and it seems like when the
> parallelism of the source changes, the source will be checkpointed and
> restored. I think this would mean all the SourceReaders get restarted, and
> their splits are returned to the SplitEnumerator for reassignment. Is this
> approximately correct?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#currentParallelism--
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/
>
>


Re: Zookeeper or Kubernetes for HA?

2021-05-03 Thread Matthias Pohl
Hi Vishal,
Do I understand you correctly that you're wondering whether you should
stick to ZooKeeper HA on Kubernetes vs Kubernetes HA? You could argue that
ZooKeeper might be better since it's already supported for longer and,
therefore, better tested. The Kubernetes HA implementation left the
experimental stage in 1.12 [1]. There are no major issues I am aware of.
Hence, it comes down to the question whether you see the necessity to have
an additional component like ZooKeeper in your cluster.

Best,
Matthias

[1]
https://flink.apache.org/news/2020/12/10/release-1.12.0.html#kubernetes-high-availability-ha-service

On Mon, May 3, 2021 at 7:13 PM vishalovercome  wrote:

> Flink docs provide details on setting up HA but doesn't provide any
> recommendations as such. For jobs running in kubernetes and having a
> zookeeper deployment, which high availability option would be more
> desirable?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink(1.12.2/scala 2.11) HA with Zk in kubernetes standalone mode deployment is not working

2021-05-03 Thread Till Rohrmann
Somewhere the system retrieves the address x.x.x.x:43092 which cannot be
connected to. Can you check that this points towards a valid Flink process?
Maybe it is some leftover information in the ZooKeeper from a previous run?
Maybe you can check what's written in the Znodes for
/leader/resource_manager_lock.
You can also enable DEBUG logs which will tell you a bit more about what is
happening.

Cheers,
Till

On Mon, May 3, 2021 at 7:12 PM Matthias Pohl  wrote:

> Hi Bhagi,
> Thanks for reaching out to the Flink community. The error the UI is
> showing is normal during an ongoing leader election. Additionally, the
> connection refused warnings seem to be normal according to other mailing
> list threads. Are you referring to the UI error as the issue you are
> facing?
>
> What puzzles me a bit are the timestamps of your provided logs. They do
> not seem to be fully aligned. Are there more logs that might indicate other
> issues?
>
> Matthias
>
> PS: I'm gonna add the user mailing list as this issue should usually be
> posted there.
>
> On Mon, May 3, 2021 at 5:21 PM bhagi@R  wrote:
>
>> Hi Team,
>>
>> I deployed kubernetes standalone deployment flink cluster with ZK HA, but
>> facing some issues, i have attached taskmanager and job manger logs.
>>
>> Can you please see the logs and help me solve this issue.
>>
>> UI is throwing this error:
>>
>> {"errors":["Service temporarily unavailable due to an ongoing leader
>> election. Please refresh."]}
>>
>> jobmanager.log
>> <
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/file/t1598/jobmanager.log>
>>
>> taskmanager.log
>> <
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/file/t1598/taskmanager.log>
>>
>> screenshot-1.png
>> <
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/file/t1598/screenshot-1.png>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>
>


Re: Setup of Scala/Flink project using Bazel

2021-05-03 Thread Salva Alcántara
Hi Matthias,

Thanks a lot for your reply. I am already aware of that reference, but it's
not exactly what I need. What I'd like to have is the typical word count
(hello world) app migrated from sbt to bazel, in order to use it as a
template for my Flink/Scala apps.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink(1.12.2/scala 2.11) HA with Zk in kubernetes standalone mode deployment is not working

2021-05-03 Thread Matthias Pohl
Hi Bhagi,
Thanks for reaching out to the Flink community. The error the UI is showing
is normal during an ongoing leader election. Additionally, the connection
refused warnings seem to be normal according to other mailing list
threads. Are you referring to the UI error as the issue you are facing?

What puzzles me a bit are the timestamps of your provided logs. They do not
seem to be fully aligned. Are there more logs that might indicate other
issues?

Matthias

PS: I'm gonna add the user mailing list as this issue should usually be
posted there.

On Mon, May 3, 2021 at 5:21 PM bhagi@R  wrote:

> Hi Team,
>
> I deployed kubernetes standalone deployment flink cluster with ZK HA, but
> facing some issues, i have attached taskmanager and job manger logs.
>
> Can you please see the logs and help me solve this issue.
>
> UI is throwing this error:
>
> {"errors":["Service temporarily unavailable due to an ongoing leader
> election. Please refresh."]}
>
> jobmanager.log
> <
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/file/t1598/jobmanager.log>
>
> taskmanager.log
> <
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/file/t1598/taskmanager.log>
>
> screenshot-1.png
> <
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/file/t1598/screenshot-1.png>
>
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-03 Thread Till Rohrmann
This is great news. Thanks a lot for being our release managers Dawid and
Guowei! And also thanks to everyone who has made this release possible :-)

Cheers,
Till

On Mon, May 3, 2021 at 5:46 PM vishalovercome  wrote:

> This is a very big release! Many thanks to the flink developers for their
> contributions to making Flink as good a framework that it is!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: remote task manager netty exception

2021-05-03 Thread Roman Khachatryan
Hi,

I see that JM and TM failures are different (from TM, it's actually a
warning). Could you please share the ERROR message from TM?

Have you tried increasing taskmanager.network.retries [1]?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#taskmanager-network-retries

Regards,
Roman

On Fri, Apr 30, 2021 at 11:55 PM Sihan You  wrote:
>
> Hi,
>
> We are experiencing some netty issue with our Flink cluster, which we 
> couldn't figure the cause.
>
> Below is the stack trace of exceptions from TM's and JM's perspectives.  we 
> have 85 TMs and one JM in HA mode. The strange thing is that only 23 of the 
> TM are complaining about the connection issue. When this exception occurs, 
> the TM they are complaining about is still up and live. this will cause our 
> job to be stuck in the restart loop for a couple of hours then back to normal.
>
> We are using HDFS as the state backend and the checkpoint dir.
> the application is running in our own data center and in Kubernetes as a 
> standalone job.
>
>
> ## Job Graph
>
> the job graph is like this.
> source 1.1 (5 parallelism).  ->
>   union ->
> source 1.2 (80 parallelism) ->
> connect 
> -> sink
> source 2.1 (5 parallelism).  ->
>   union ->
> source 2.2 (80 parallelism) ->
>
>
> ## JM's Stacktrace
>
> ```
> message="PLI Deduplicate Operator (60/80) (5d2b9fba2eaeae452068bc53e4232d0c) 
> switched from RUNNING to FAILED on 100.98.115.117:6122-924d20 @ 
> 100.98.115.117 
> (dataPort=41245)."org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>  Sending the partition request to 'null' failed. at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.NettyMessage.writeToChannel(NettyMessage.java:737)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.NettyMessage$PartitionRequest.write(NettyMessage.java:521)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:171)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerCont

Re: Protobuf support with Flink SQL and Kafka Connector

2021-05-03 Thread Matthias Pohl
Hi Shipeng,
it looks like there is an open Jira issue FLINK-18202 [1] addressing this
topic. You might want to follow up on that one. I'm adding Timo and Jark to
this thread. They might have more insights.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-18202

On Sat, May 1, 2021 at 2:00 AM Fuyao Li  wrote:

> Hello Shipeng,
>
>
>
> I am not an expert in Flink, just want to share some of my thoughts. Maybe
> others can give you better ideas.
>
> I think there is no directly available Protobuf support for Flink SQL.
> However, you can write a user-defined format to support it [1].
>
> If you use DataStream API, you can leverage Kryo Serializer to serialize
> and deserialize with Protobuf format. [2]. There is an out-of-box
> integration for Protobuf here. You will need to convert it to Flink SQL
> after data ingestion.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html#user-defined-sources-sinks
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
>
>
>
> Best,
>
> Fuyao
>
>
>
>
>
> *From: *Shipeng Xie 
> *Date: *Friday, April 30, 2021 at 14:58
> *To: *user@flink.apache.org 
> *Subject: *[External] : Protobuf support with Flink SQL and Kafka
> Connector
>
> Hi,
>
>
>
> In
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/
> ,
> it does not mention protobuf format. Does Flink SQL support protobuf
> format? If not, is there any plan to support it in the near future?
>
> Thanks!
>


[ANNOUNCE] Apache Flink 1.13.0 released

2021-05-03 Thread Dawid Wysakowicz
|The Apache Flink community is very happy to announce the release of
Apache Flink 1.13.0.|
 
|Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.|
 
|The release is available for download at:|
|https://flink.apache.org/downloads.html
|
 
|Please check out the release blog post for an overview of the
improvements for this bugfix release:|
|https://flink.apache.org/news/2021/05/03/release-1.13.0.html|

 
|The full release notes are available in Jira:|
|https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349287
|
 
|We would like to thank all contributors of the Apache Flink community
who made this release possible!|
 
|Regards,|
|Guowei & Dawid
|


OpenPGP_signature
Description: OpenPGP digital signature


Re: java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.R

2021-05-03 Thread Matthias Pohl
Hi Ragini,
this is a dependency version issue. Flink 1.8.x does not support Hadoop 3,
yet. The support for Apache Hadoop 3.x was added in Flink 1.11 [1] through
FLINK-11086 [2]. You would need to upgrade to a more recent Flink version.

Best,
Matthias

[1]
https://flink.apache.org/news/2020/07/06/release-1.11.0.html#important-changes
[2] https://issues.apache.org/jira/browse/FLINK-11086

On Mon, May 3, 2021 at 3:05 PM Ragini Manjaiah 
wrote:

> Hi Team,
> I have Flink 1.8.1 and  hadoop open source 3.2.0 . My flink jobs run
> without issues on HDP 2.5.3 version. when run on hadoop open source 3.2.0
> encountering the below mentioned exception .
> I have set hadoop
> export HADOOP_CONF_DIR=/etc/hadoop/conf
> export HADOOP_CLASSPATH=`hadoop classpath`
>
>
> SLF4J: Class path contains multiple SLF4J bindings.
>
> SLF4J: Found binding in
> [jar:file:/home_dir/svsap61/flink-1.8.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
> SLF4J: Found binding in
> [jar:file:/usr/share/hadoop-tgt-3.2.0.1.0.0.11/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
>
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>
> java.lang.IllegalAccessError: tried to access method
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
> from class
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
>
> at
> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)
>
> at
> org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:188)
>
> at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:118)
>
> at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:93)
>
> at
> org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
>
> at
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:195)
>
> at
> org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
>
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.getClusterDescriptor(FlinkYarnSessionCli.java:1013)
>
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createDescriptor(FlinkYarnSessionCli.java:274)
>
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:454)
>
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:97)
>
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:224)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>


Re: Setup of Scala/Flink project using Bazel

2021-05-03 Thread Matthias Pohl
Hi Salva,
unfortunately, I have no experience with Bazel. Just by looking at the code
you shared I cannot come up with an answer either. Have you checked out the
ML thread in [1]? It provides two other examples where users used Bazel in
the context of Flink. This might give you some hints on where to look.

Sorry for not being more helpful.

Best,
Matthias

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-anyone-have-an-example-of-Bazel-working-with-Flink-td35898.html

On Fri, Apr 30, 2021 at 11:57 AM Salva Alcántara 
wrote:

> I am trying to setup a simple flink application from scratch using Bazel.
> I've bootstrapped the project by running
>
> ```
> sbt new tillrohrmann/flink-project.g8
> ```
>
> and after that I have added some files in order for Bazel to take control
> of
> the building (i.e., migrate from sbt). This is how the `WORKSPACE` looks
> like
>
> ```
> # WORKSPACE
> load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
>
> skylib_version = "1.0.3"
> http_archive(
> name = "bazel_skylib",
> sha256 =
> "1c531376ac7e5a180e0237938a2536de0c54d93f5c278634818e0efc952dd56c",
> type = "tar.gz",
> url =
> "
> https://mirror.bazel.build/github.com/bazelbuild/bazel-skylib/releases/download/{}/bazel-skylib-{}.tar.gz
> ".format(skylib_version,
> skylib_version),
> )
>
> rules_scala_version = "5df8033f752be64fbe2cedfd1bdbad56e2033b15"
>
> http_archive(
> name = "io_bazel_rules_scala",
> sha256 =
> "b7fa29db72408a972e6b6685d1bc17465b3108b620cb56d9b1700cf6f70f624a",
> strip_prefix = "rules_scala-%s" % rules_scala_version,
> type = "zip",
> url = "https://github.com/bazelbuild/rules_scala/archive/%s.zip"; %
> rules_scala_version,
> )
>
> # Stores Scala version and other configuration
> # 2.12 is a default version, other versions can be use by passing them
> explicitly:
> load("@io_bazel_rules_scala//:scala_config.bzl", "scala_config")
> scala_config(scala_version = "2.12.11")
>
> load("@io_bazel_rules_scala//scala:scala.bzl", "scala_repositories")
> scala_repositories()
>
> load("@io_bazel_rules_scala//scala:toolchains.bzl",
> "scala_register_toolchains")
> scala_register_toolchains()
>
> load("@io_bazel_rules_scala//scala:scala.bzl", "scala_library",
> "scala_binary", "scala_test")
>
> # optional: setup ScalaTest toolchain and dependencies
> load("@io_bazel_rules_scala//testing:scalatest.bzl",
> "scalatest_repositories", "scalatest_toolchain")
> scalatest_repositories()
> scalatest_toolchain()
>
> load("//vendor:workspace.bzl", "maven_dependencies")
> maven_dependencies()
>
> load("//vendor:target_file.bzl", "build_external_workspace")
> build_external_workspace(name = "vendor")
> ```
>
> and this is the `BUILD` file
>
> ```bazel
> package(default_visibility = ["//visibility:public"])
>
> load("@io_bazel_rules_scala//scala:scala.bzl", "scala_library",
> "scala_test")
>
> scala_library(
> name = "job",
> srcs = glob(["src/main/scala/**/*.scala"]),
> deps = [
> "@vendor//vendor/org/apache/flink:flink_clients",
> "@vendor//vendor/org/apache/flink:flink_scala",
> "@vendor//vendor/org/apache/flink:flink_streaming_scala",
> ]
> )
> ```
>
> I'm using [bazel-deps](https://github.com/johnynek/bazel-deps) for
> vendoring
> the dependencies (put in the `vendor` folder). I have this on my
> `dependencies.yaml` file:
>
> ```yaml
> options:
>   buildHeader: [
>   "load(\"@io_bazel_rules_scala//scala:scala_import.bzl\",
> \"scala_import\")",
>   "load(\"@io_bazel_rules_scala//scala:scala.bzl\", \"scala_library\",
> \"scala_binary\", \"scala_test\")",
>   ]
>   languages: [ "java", "scala:2.12.11" ]
>   resolverType: "coursier"
>   thirdPartyDirectory: "vendor"
>   resolvers:
> - id: "mavencentral"
>   type: "default"
>   url: https://repo.maven.apache.org/maven2/
>   strictVisibility: true
>   transitivity: runtime_deps
>   versionConflictPolicy: highest
>
> dependencies:
>   org.apache.flink:
> flink:
>   lang: scala
>   version: "1.11.2"
>   modules: [clients, scala, streaming-scala] # provided
> flink-connector-kafka:
>   lang: java
>   version: "0.10.2"
> flink-test-utils:
>   lang: java
>   version: "0.10.2"
> ```
>
> For downloading the dependencies, I'm running
>
> ```
> bazel run //:parse generate -- --repo-root ~/Projects/bazel-flink-scala
> --sha-file vendor/workspace.bzl --target-file vendor/target_file.bzl --deps
> dependencies.yaml
> ```
>
> Which runs just fine, but then when I try to build the project
>
> ```
> bazel build //:job
> ```
>
> I'm getting this error
>
> ```
> Starting local Bazel server and connecting to it...
> ERROR: Traceback (most recent call last):
> File
> "/Users/salvalcantara/Projects/me/bazel-flink-scala/WORKSPACE", line
> 44, column 25, in 
> build_external_workspace(name = "vendor")
> File
>
> "/Users/salvalcantara/Projects/me/bazel-flink-scala/vendor/target_file.bzl",
> line 2

java.lang.IllegalAccessError: tried to access method org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object; from class org.apache.hadoop.yarn.client.Reque

2021-05-03 Thread Ragini Manjaiah
Hi Team,
I have Flink 1.8.1 and  hadoop open source 3.2.0 . My flink jobs run
without issues on HDP 2.5.3 version. when run on hadoop open source 3.2.0
encountering the below mentioned exception .
I have set hadoop
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HADOOP_CLASSPATH=`hadoop classpath`


SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in
[jar:file:/home_dir/svsap61/flink-1.8.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in
[jar:file:/usr/share/hadoop-tgt-3.2.0.1.0.0.11/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

java.lang.IllegalAccessError: tried to access method
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;
from class
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider

at
org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider.init(RequestHedgingRMFailoverProxyProvider.java:75)

at
org.apache.hadoop.yarn.client.RMProxy.createRMFailoverProxyProvider(RMProxy.java:188)

at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:118)

at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:93)

at
org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)

at
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:195)

at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)

at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.getClusterDescriptor(FlinkYarnSessionCli.java:1013)

at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createDescriptor(FlinkYarnSessionCli.java:274)

at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:454)

at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClusterDescriptor(FlinkYarnSessionCli.java:97)

at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:224)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)

at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)


Re: Flink Version 1.11 job savepoint failures

2021-05-03 Thread Matthias Pohl
Hi Rainie,
the savepoint creation failed due to some tasks already being finished. It
looks like you ran into an issue that was (partially as FLINK-21066 [1] is
only a subtask of a bigger issue?) addressed in Flink 1.13 (see
FLINK-21066). I'm pulling Yun Gao into this thread. Let's see whether Yun
can confirm that finding.

I hope that helps.
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-21066

On Mon, May 3, 2021 at 9:07 AM Rainie Li  wrote:

> Hi Flink Community,
>
> Our flink jobs are in version 1.11 and we use this to trigger savepoint.
> $ bin/flink savepoint :jobId [:targetDirectory]
> We can get trigger Id with savepoint path successfully.
>
> But we saw these errors by querying savepoint endpoint:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html#jobs-jobid-savepoints-triggerid
> e.g. application_id/jobs/job_id/savepoints/trigger_id
>
> {
> *  "*errors*": *[
> "org.apache.flink.runtime.rest.NotFoundException: Operation not found
> under key:
> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@8893e196\n\tat
> org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers$StatusHandler.handleRequest(AbstractAsynchronousOperationHandlers.java:167)\n\tat
> org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.handleRequest(SavepointHandlers.java:193)\n\tat
> org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:73)\n\tat
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:178)\n\tat
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:81)\n\tat
> java.util.Optional.ifPresent(Optional.java:159)\n\tat
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:46)\n\tat
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:78)\n\tat
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)\n\tat
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)\n\tat
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)\n\tat
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedCha

Scale up REGEX pipeline

2021-05-03 Thread Antón Rodríguez Yuste
Hi community,

I'm working in a pipeline which needs to apply several REGEX expressions
for matching. I have around 10K Regex expressions but, depending on some
metadata in the message, I only need to apply 5-10 for that specific
message.

I've being doing some research

with java.util.regex, com.google.re2j and org.apache.regexp.RE but any of
them fit my requirements:

   - If I don't compile them, it takes too much time per operation.
   - I compile them, they occupy a huge amount of memory which makes the
   process too expensive.

It's ok if I skip some messages, so I was considering implementing a lazy
cache so I only keep in memory the REGEX compiled patterns which are "hot".
This solution seems quite complex and not ideal.

Do you know any other alternative / idea to tackle this?

Cheers,

Antón


Re: Use State query to dump state into datalake

2021-05-03 Thread David Anderson
I think you'd be better off using the State Processor API [1] instead. The
State Processor API has cleaner semantics -- as you'll be seeing a
self-consistent snapshot of all the state -- and it's also much more
performant.

Note also that the Queryable State API is "approaching end of life" [2].
The long-term objective is to replace this with something more useful.

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
[2] https://flink.apache.org/roadmap.html

On Sun, May 2, 2021 at 9:07 PM Lian Jiang  wrote:

> Hi,
>
> I am interested in dumping Flink state from Rockdb to datalake using state
> query
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/queryable_state/.
> My map state could have 200 million key-values pairs and the total size
> could be 150G bytes. My batch job scheduled using airflow will have one
> task which uses Flink state query to dump the Flink state to datalake in
> parquet format so other spark tasks can use it.
>
> Is there any scalability concern for using state query in this way?
> Appreciate any insight. Thanks!
>


Flink Version 1.11 job savepoint failures

2021-05-03 Thread Rainie Li
Hi Flink Community,

Our flink jobs are in version 1.11 and we use this to trigger savepoint.
$ bin/flink savepoint :jobId [:targetDirectory]
We can get trigger Id with savepoint path successfully.

But we saw these errors by querying savepoint endpoint:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html#jobs-jobid-savepoints-triggerid
e.g. application_id/jobs/job_id/savepoints/trigger_id

{
*  "*errors*": *[
"org.apache.flink.runtime.rest.NotFoundException: Operation not found
under key:
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@8893e196\n\tat
org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers$StatusHandler.handleRequest(AbstractAsynchronousOperationHandlers.java:167)\n\tat
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.handleRequest(SavepointHandlers.java:193)\n\tat
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:73)\n\tat
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:178)\n\tat
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:81)\n\tat
java.util.Optional.ifPresent(Optional.java:159)\n\tat
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:46)\n\tat
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:78)\n\tat
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)\n\tat
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)\n\tat
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)\n\tat
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)\n\tat
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)\n\tat
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)\n\tat
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java: