Upgrade job topology in checkpoint

2021-06-11 Thread Padarn Wilson
Hi all,

I'm looking for some clarity about changing job topology as described here:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/upgrading/#application-topology

My question is simple: Does this only apply to savepoints? Or can it also
work for checkpoints? (also why if not)

Cheers,
Padarn


Checkpoint is timing out - inspecting state

2021-06-11 Thread Dan Hill
Hi.

We're doing something bad with our Flink state.  We just launched a feature
that creates very big values (lists of objects that we append to) in
MapState.

Our checkpoints time out (10 minutes).  I'm assuming the values are too
big.  Backpressure is okay and cpu+memory metrics look okay.

Questions

1. Is there an easy tool for inspecting the Flink state?

I found this post about drilling into Flink state
.
I was hoping for something more like a CLI.

2. Is there a way to break down the time spent during a checkout if it
times out?

Thanks!
- Dan


Re: Output from RichAsyncFunction on failure

2021-06-11 Thread Satish Saley
One way I thought to achieve this is -
1. For failures, add a special record to collection in RichAsyncFunction
2. Filter out those special records from the DataStream and push to another
Kafka
Let me know if it makes sense.


On Fri, Jun 11, 2021 at 10:40 AM Satish Saley 
wrote:

> Hi,
> - I have a kafka consumer to read events.
> - Then, I have RichAsyncFunction to call a remote service to get
> more information about that event.
>
> If the remote call fails after X number of retries, I don't want flink to
> fail the job and start processing from the beginning. Instead I would like
> to push info about failed call to another Kafka topic. Is there a way to
> achieve this?
>


Output from RichAsyncFunction on failure

2021-06-11 Thread Satish Saley
Hi,
- I have a kafka consumer to read events.
- Then, I have RichAsyncFunction to call a remote service to get
more information about that event.

If the remote call fails after X number of retries, I don't want flink to
fail the job and start processing from the beginning. Instead I would like
to push info about failed call to another Kafka topic. Is there a way to
achieve this?


Re: How to know (in code) how many times the job restarted?

2021-06-11 Thread Felipe Gutierrez
Cool!

I did using this example
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
because I don't have a keyed stream on the specific operator that I want to
count the number of restarts. (yes I am using version 1.4 unfortunately).

Because I need to test it in an integration test I am using a side output (
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
to attach a sink. I am not sure if you have a better idea to test the
restarts on an integration test. If you have a simple idea please tell me
:). This was the way that I solved

Thanks
Felipe

*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*


On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan  wrote:

> Hi Felipe,
>
> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
> that depending on the configuration only a pipeline region can be
> restarted, not the whole job).
>
> But if all you want is to check whether it's a first attempt or not,
> you can also call context.isRestored() from initializeState() [2]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>
> Regards,
> Roman
>
>
> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>  wrote:
> >
> > Hello community,
> >
> > Is it possible to know programmatically how many times my Flink stream
> job restarted since it was running?
> >
> > My use case is like this. I have an Unit test that uses checkpoint and I
> throw one exception in a MapFunction for a given time, i.e.: for the 2
> seconds ahead. Because Flink restarts the job and I have checkpoint I can
> recover the state and after 2 seconds I don't throw any exception anymore.
> Then I would like to know how many times the job was restarted.
> >
> > Thanks,
> > Felipe
> >
>


Re: 1.13.1 jobmanager annotations by pod template does not work

2021-06-11 Thread 陳昌倬
On Fri, Jun 11, 2021 at 11:19:09PM +0800, Yang Wang wrote:
> Could you please share your pod template and the value of
> kubernetes.jobmanager.annotations?
>
> Usually the annotations of pod template and flink config options will be
> merged. And the flink config
> options has higher priority if you are specifying same name annotation.
>
> I have verified in minikube and it could take effect as expected.

Hi,

There are other finding for this issue:

* For jobanager:
  * annotations, and labels in pod template do not work.
  * annotations, and labels in -Dkubernetes.jobmanager.* work.

* For taskmanager:
  * annotations, and labels in pod template work


The following is jobmanager pod template:

apiVersion: batch/v1
kind: Pod
metadata:
  labels:
app: jobmanager
helm.sh/chart: 
app.kubernetes.io/name: 
app.kubernetes.io/instance: 
app.kubernetes.io/version: 
app.kubernetes.io/managed-by: Helm
  annotations:
rt.prometheus.io/scrape: 'true'
rt.prometheus.io/path: '/'
rt.prometheus.io/port: '9249'


The following is pod created as jobmanager:


Name: 
Namespace:
Priority: 200
Priority Class Name:  medium
Node: 
Start Time:   Fri, 11 Jun 2021 23:38:52 +0800
Labels:   app=
  component=jobmanager
  pod-template-hash=55846fd8f7
  type=flink-native-kubernetes
Annotations:  


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: 1.13.1 jobmanager annotations by pod template does not work

2021-06-11 Thread Yang Wang
Could you please share your pod template and the value of
kubernetes.jobmanager.annotations?

Usually the annotations of pod template and flink config options will be
merged. And the flink config
options has higher priority if you are specifying same name annotation.

I have verified in minikube and it could take effect as expected.

Best,
Yang

ChangZhuo Chen (陳昌倬)  于2021年6月11日周五 下午4:52写道:

>
> Hi,
>
> We found that jobmanager annotations defined by pod template does not
> work. However, annotations defined by kubernetes.jobmanager.annotations
> [0]
>
> This behavior is different from document [1].
>
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#kubernetes-jobmanager-annotations
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: Looking for online live training courses

2021-06-11 Thread B.B.
There are some beginner courses on Pluralsight. Just look for those with
newer dates.

BR,
BB

On Fri, 11 Jun 2021 at 03:33, Xia(Nate) Qu  wrote:

> Hi all,
>
> My team is planning to start our journey of Apache Flink, was wondering if
> there are any professional training courses (online interactive at this
> moment) recommended? Thanks
>
>
> Best,
>
> *Xia(Nate) Qu*
>
> --
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane


Re: Add control mode for flink

2021-06-11 Thread 刘建刚
Thanks Till for the reply. The suggestions are really helpful for the
topic. Maybe something I mention is not clear or not detail. Here are what
I want to say:

   1. Changing log level is not suitable for the topic as you said. Because
   our inner log4j is old, so this feature is implemented in a trick way
   through restful interface. Since it can be done through logging backend and
   is not related with control event, please forgive my fault and just ignore
   it.
   2.  I totally agree with you that data consistency is important for
   flink. So in our initial plan, control events should flow from source to
   down tasks. In this way, it can combine with current checkpoint mechanism.
   Every detail should be carefully considered to ensure exactly-once.
   3. For the quick recovery, I have mentioned that some work is doing to
   solve it, like generalized incremental checkpoints. It is really helpful to
   us and can resolve many problems. But maybe some special corners can not be
   covered, for example, uploading a big jar may consume some time.

Welcome more people to discuss the question, give suggestions and ideas to
make a better flink.

Till Rohrmann [via Apache Flink User Mailing List archive.] <
ml+s2336050n44392...@n4.nabble.com> 于2021年6月11日周五 下午4:46写道:

> Thanks for starting this discussion. I do see the benefit of dynamically
> configuring your Flink job and the cluster running it. Some of the use
> cases which were mentioned here are already possible. E.g. adjusting the
> log level dynamically can be done by configuring an appropriate logging
> backend and then changing the logging properties (log4j 2 supports this for
> example). Then the remaining use cases can be categorized into two
> categories:
>
> 1) changing the job
> 2) changing the cluster configuration
>
> 1) would benefit from general control flow events which will be processed
> by all operators. 2) would require some component sending some control
> events to the other Flink processes.
>
> Implementing the control flow events can already be done to a good extent
> on the user level by using a connected stream and a user level-record type
> which can distinguish between control events and normal records.
> Admittedly, it is a bit of work, though.
>
> I think persisting all of these changes would be very important because
> otherwise, you might end up easily in an inconsistent state. For example,
> assume you have changed the log level and now a subset of the TaskManagers
> needs to be restarted. Now, all of a sudden some TaskManagers log on level
> X and the others on level Y. The same applies to job changes. A regional
> failover would have to restore the latest dynamically configured state. All
> in all, this looks like a very complex and complicated task.
>
> On the other hand, most of the described use cases should be realizable
> with a restart of a job. So if Flink were able to quickly resume a job,
> then we would probably not need this feature. Applying the changes to the
> Flink and the job configuration and resubmitting the job would do the
> trick. Hence, improving Flink's recovery speed could be an alternative
> approach to this problem.
>
> Cheers,
> Till
>
> On Fri, Jun 11, 2021 at 9:51 AM Jary Zhen <[hidden email]
> > wrote:
>
>>  big +1 for this feature,
>>
>>1. Reset kafka offset in certain cases.
>>2. Stop checkpoint in certain cases.
>>3. Change log level for debug.
>>
>>
>> 刘建刚 <[hidden email] >
>> 于2021年6月11日周五 下午12:17写道:
>>
>>> Thanks for all the discussions and suggestions. Since the topic has
>>> been discussed for about a week, it is time to have a conclusion and new
>>> ideas are welcomed at the same time.
>>> First, the topic starts with use cases in restful interface. The
>>> restful interface supported many useful interactions with users, for
>>> example as follows. It is an easy way to control the job compared with
>>> broadcast api.
>>>
>>>1. Change data processing’ logic by dynamic configs, such as filter
>>>condition.
>>>2. Define some tools to control the job, such as QPS limit,
>>>sampling, change log level and so on.
>>>
>>> Second, we broaden the topic to control flow in order to support all
>>> kinds of control events besides the above user cases. There is a strong
>>> demand to support custom (broadcast) events for iteration, SQL control
>>> events and so on. As Xintong Song said, the key to the control flow lies as
>>> follows:
>>>
>>>1. Who (which component) is responsible for generating the control
>>>messages? It may be the jobmaster by some ways, the inner operator and so
>>>on.
>>>2. Who (which component) is responsible for reacting to the
>>>messages.
>>>3. How do the messages propagate? Flink should support sending
>>>control messages by channels.
>>>4. When it comes to affecting the computation logics, how should 

Re: Re:Re: flink sql维表延迟join如何实现?

2021-06-11 Thread chenchencc
temporary join



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re:Re:Re:Re:flink sql cdc数据同步至mysql

2021-06-11 Thread 东东



1、升级到1.13
2、能不能追上要看写入量到底有多大,以及下游的处理能力啊,就是mysql自己的主从复制也不一定能确保追上,实践就知道了。
3、可以设置一下default.parallism试试,如果发现被chain到一起了,可以把operator chain关掉试试。


在 2021-06-11 18:57:36,"casel.chen"  写道:
>我的场景就是简单的数据同步,没有join也没有group by,从一个mysql库同步到另一个mysql库。
>上游写入数据速度很快,如果用flink sql同步的话默认只有一个并行度写,速度会跟不上,这种情况要怎么处理?
>用的是flink 1.12.1 其jdbc connector还不支持sink.parallism参数
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-06-11 16:32:00,"东东"  写道:
>>
>>
>>
>>他这里列举的case是sink前发生了基于非pk的shuffle,比如说有join而且join条件不是主键,你的场景是怎样的呢?
>>
>>
>>
>>
>>另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es 
>>connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc 
>>connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash 
>>shuffle,确保相同pk的记录发到同一个sink task。
>>
>>
>>在 2021-06-11 15:57:29,"casel.chen"  写道:
>>>引用 Leonard Xu大佬之前的回答:
>>>
 flink 1.13的jdbc connector新增 sink.parallism 
 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?
>>>
>>>这个不仅在同步场景,在其他场景也需要注意 
>>>sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 
>>>和上游数据shuffle的key(比如 group key, join key)保持一致,
>>>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 
>>>https://issues.apache.org/jira/browse/FLINK-20374 
>>> 
>>>https://issues.apache.org/jira/browse/FLINK-22901 
>>> 
>>>
>>>说明加 sink.parallelism 是不行的
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>在 2021-06-11 15:44:51,"JasonLee" <17610775...@163.com> 写道:
hi

sink 端可以通过 sink.parallelism 进行设置.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: PyFlink: Upload resource files to Flink cluster

2021-06-11 Thread Sumeet Malhotra
I'm using a standalone deployment on Kubernetes for this use case. Does the
archive get uploaded to the cluster via the :8081 REST/WebUI port or via
some other port like 6123/RPC or 6124/BLOB-SERVER? I'm wondering if not
exposing those ports on the local machine might prevent the archive from
getting loaded? Although I would have expected an explicit error in that
case.

NAMESPACE NAME   TYPE   PORTS
flink flink-jobmanager   ClusterIP  rpc:6123►0
blob-server:6124►0 webui:8081►0

Thanks,
Sumeet


On Fri, Jun 11, 2021 at 2:48 PM Roman Khachatryan  wrote:

> Hi Sumeet,
>
> Probably there is an issue with uploading the archive while submitting the
> job.
> The commands and API usage look good to me.
> Dian could you please confirm that?
>
> Regards,
> Roman
>
> On Fri, Jun 11, 2021 at 9:04 AM Sumeet Malhotra
>  wrote:
> >
> > Thank you Roman. Yes, that's what I am going to do.
> >
> > But I'm running into another issue... when I specify the --pyArchives
> option on the command line, the job never gets submitted and is stuck
> forever. And when I try to programmatically do this by calling
> add_python_archive(), the job gets submitted but fails because the target
> directory is not found on the UDF node. Flink is deployed on a K8S cluster
> in my case and the port 8081 is forwarded to the localhost.
> >
> > Here's the command line I use:
> >
> > ~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python
> my_job.py  --pyArchives file:///path/to/schema.zip#schema
> >
> > And within the UDF I'm access the schema file as:
> >
> > read_schema('schema/my_schema.json')
> >
> > Or if I try using the API instead of the command-line, the call looks as:
> >
> > env = StreamExecutionEnvironment.get_execution_environment()
> > env.add_python_archive('schema.zip', 'schema')
> >
> > Initially, my_job.py itself had its own command line options, and I was
> thinking that might interfere with the overall Flink command line options,
> but even after removing that I'm not able to submit the job anymore.
> However, if I don't use the --pyArchives option and manually transfer the
> schema file to a location on the UDF node, the job gets submitted and works
> as expected.
> >
> > Any reason why this might happen?
> >
> > Thanks,
> > Sumeet
> >
> >
> > On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan 
> wrote:
> >>
> >> Hi,
> >>
> >> I think the second option is what you need. The documentation says
> >> only zip format is supported.
> >> Alternatively, you could upload the files to S3 or other DFS and
> >> access from TMs and re-upload when needed.
> >>
> >> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
> >>
> >> Regards,
> >> Roman
> >>
> >> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
> >>  wrote:
> >> >
> >> > Hi,
> >> >
> >> > I'm using UDTFs in PyFlink, that depend upon a few resource files
> (JSON schema files actually). The path of this file can be passed into the
> UDTF, but essentially this path needs to exist on the Task Manager node
> where the task executes. What's the best way to upload these resource
> files? As of now, my custom Flink image creates a fixed path with the
> required resource files, but I'd like it to be run time configurable.
> >> >
> >> > There are 2 APIs available to load files when submitting a PyFlink
> job...
> >> >
> >> > stream_execution_environment.add_python_file() - Recommended to
> upload files (.py etc) but doesn't let me configure the final path on the
> target node. The files are added to PYTHONPATH, but it needs the UDTF
> function to lookup for this file. I'd like to pass the file location into
> the UDTF instead.
> >> >
> >> > stream_execution_environment.add_python_archive() - Appears to be
> more generic, in the sense that it allows a target directory to be
> specified. The documentation doesn't say anything about the contents of the
> archive, so I'm guessing it could be any type of file. Is this what is
> needed for my use case?
> >> >
> >> > Or is there any other recommended way to upload non-Python
> dependencies/resources?
> >> >
> >> > Thanks in advance,
> >> > Sumeet
>


Re:Re:Re:Re:flink sql cdc数据同步至mysql

2021-06-11 Thread casel.chen
我的场景就是简单的数据同步,没有join也没有group by,从一个mysql库同步到另一个mysql库。
上游写入数据速度很快,如果用flink sql同步的话默认只有一个并行度写,速度会跟不上,这种情况要怎么处理?
用的是flink 1.12.1 其jdbc connector还不支持sink.parallism参数

















在 2021-06-11 16:32:00,"东东"  写道:
>
>
>
>他这里列举的case是sink前发生了基于非pk的shuffle,比如说有join而且join条件不是主键,你的场景是怎样的呢?
>
>
>
>
>另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es 
>connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc 
>connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash 
>shuffle,确保相同pk的记录发到同一个sink task。
>
>
>在 2021-06-11 15:57:29,"casel.chen"  写道:
>>引用 Leonard Xu大佬之前的回答:
>>
>>> flink 1.13的jdbc connector新增 sink.parallism 
>>> 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?
>>
>>这个不仅在同步场景,在其他场景也需要注意 
>>sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 
>>和上游数据shuffle的key(比如 group key, join key)保持一致,
>>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 
>>https://issues.apache.org/jira/browse/FLINK-20374 
>> 
>>https://issues.apache.org/jira/browse/FLINK-22901 
>> 
>>
>>说明加 sink.parallelism 是不行的
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2021-06-11 15:44:51,"JasonLee" <17610775...@163.com> 写道:
>>>hi
>>>
>>>sink 端可以通过 sink.parallelism 进行设置.
>>>
>>>
>>>
>>>-
>>>Best Wishes
>>>JasonLee
>>>--
>>>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: NPE when restoring from savepoint in Flink 1.13.1 application

2021-06-11 Thread 陳昌倬
On Thu, Jun 10, 2021 at 07:10:45PM +0200, Roman Khachatryan wrote:
> Hi ChangZhuo,
> 
> Thanks for reporting, it looks like a bug.
> I've opened a ticket for that [1].
> 
> [1]
> https://issues.apache.org/jira/browse/FLINK-22966

Thanks for the help.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-11 Thread DanielGu
同求一波配置
谢谢大佬们



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: [table-walkthrough] docker-compose build issues

2021-06-11 Thread Lingfeng Pu
Hi,

I apologize that I forgot the attachments in my last post. I'll repost my
question with attachments this time:

*I have successfully run the project "table-walkthrough" on IDEA (w/t
errors but warnings)*, *I'm now trying to build this project by using the
"docker-compose" command* as the tutorial does. However, when I run the
"docker-compose build" command in the directory of project
"table-walkthrough", *it returns me the following error messages:*

[AkatsukiG5@localhost table-walkthrough]$ docker-compose build
Traceback (most recent call last):
  File "/usr/lib/python3.9/site-packages/urllib3/connectionpool.py", line
670, in urlopen
httplib_response = self._make_request(
  File "/usr/lib/python3.9/site-packages/urllib3/connectionpool.py", line
392, in _make_request
conn.request(method, url, **httplib_request_kw)
  File "/usr/lib64/python3.9/http/client.py", line 1253, in request
self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/lib64/python3.9/http/client.py", line 1299, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/lib64/python3.9/http/client.py", line 1248, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/lib64/python3.9/http/client.py", line 1008, in _send_output
self.send(msg)
  File "/usr/lib64/python3.9/http/client.py", line 948, in send
self.connect()
  File "/usr/lib/python3.9/site-packages/docker/transport/unixconn.py",
line 43, in connect
sock.connect(self.unix_socket)
FileNotFoundError: [Errno 2] No such file or directory

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.9/site-packages/requests/adapters.py", line 439,
in send
resp = conn.urlopen(
  File "/usr/lib/python3.9/site-packages/urllib3/connectionpool.py", line
726, in urlopen
retries = retries.increment(
  File "/usr/lib/python3.9/site-packages/urllib3/util/retry.py", line 403,
in increment
raise six.reraise(type(error), error, _stacktrace)
  File "/usr/lib/python3.9/site-packages/urllib3/packages/six.py", line
708, in reraise
raise value.with_traceback(tb)
  File "/usr/lib/python3.9/site-packages/urllib3/connectionpool.py", line
670, in urlopen
httplib_response = self._make_request(
  File "/usr/lib/python3.9/site-packages/urllib3/connectionpool.py", line
392, in _make_request
conn.request(method, url, **httplib_request_kw)
  File "/usr/lib64/python3.9/http/client.py", line 1253, in request
self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/lib64/python3.9/http/client.py", line 1299, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/lib64/python3.9/http/client.py", line 1248, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/lib64/python3.9/http/client.py", line 1008, in _send_output
self.send(msg)
  File "/usr/lib64/python3.9/http/client.py", line 948, in send
self.connect()
  File "/usr/lib/python3.9/site-packages/docker/transport/unixconn.py",
line 43, in connect
sock.connect(self.unix_socket)
urllib3.exceptions.ProtocolError: ('Connection aborted.',
FileNotFoundError(2, 'No such file or directory'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.9/site-packages/docker/api/client.py", line 214,
in _retrieve_server_version
return self.version(api_version=False)["ApiVersion"]
  File "/usr/lib/python3.9/site-packages/docker/api/daemon.py", line 181,
in version
return self._result(self._get(url), json=True)
  File "/usr/lib/python3.9/site-packages/docker/utils/decorators.py", line
46, in inner
return f(self, *args, **kwargs)
  File "/usr/lib/python3.9/site-packages/docker/api/client.py", line 237,
in _get
return self.get(url, **self._set_request_timeout(kwargs))
  File "/usr/lib/python3.9/site-packages/requests/sessions.py", line 555,
in get
return self.request('GET', url, **kwargs)
  File "/usr/lib/python3.9/site-packages/requests/sessions.py", line 542,
in request
resp = self.send(prep, **send_kwargs)
  File "/usr/lib/python3.9/site-packages/requests/sessions.py", line 655,
in send
r = adapter.send(request, **kwargs)
  File "/usr/lib/python3.9/site-packages/requests/adapters.py", line 498,
in send
raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.',
FileNotFoundError(2, 'No such file or directory'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/bin/docker-compose", line 33, in 
sys.exit(load_entry_point('docker-compose==1.28.6', 'console_scripts',
'docker-compose')())
  File "/usr/lib/python3.9/site-packages/compose/cli/main.py", line 81, in
main
command_func()
  File "/usr/lib/python3.9/site-packages/compose/cli/main.py", line 

Re: PyFlink: Upload resource files to Flink cluster

2021-06-11 Thread Roman Khachatryan
Hi Sumeet,

Probably there is an issue with uploading the archive while submitting the job.
The commands and API usage look good to me.
Dian could you please confirm that?

Regards,
Roman

On Fri, Jun 11, 2021 at 9:04 AM Sumeet Malhotra
 wrote:
>
> Thank you Roman. Yes, that's what I am going to do.
>
> But I'm running into another issue... when I specify the --pyArchives option 
> on the command line, the job never gets submitted and is stuck forever. And 
> when I try to programmatically do this by calling add_python_archive(), the 
> job gets submitted but fails because the target directory is not found on the 
> UDF node. Flink is deployed on a K8S cluster in my case and the port 8081 is 
> forwarded to the localhost.
>
> Here's the command line I use:
>
> ~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python my_job.py  
> --pyArchives file:///path/to/schema.zip#schema
>
> And within the UDF I'm access the schema file as:
>
> read_schema('schema/my_schema.json')
>
> Or if I try using the API instead of the command-line, the call looks as:
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_python_archive('schema.zip', 'schema')
>
> Initially, my_job.py itself had its own command line options, and I was 
> thinking that might interfere with the overall Flink command line options, 
> but even after removing that I'm not able to submit the job anymore. However, 
> if I don't use the --pyArchives option and manually transfer the schema file 
> to a location on the UDF node, the job gets submitted and works as expected.
>
> Any reason why this might happen?
>
> Thanks,
> Sumeet
>
>
> On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> I think the second option is what you need. The documentation says
>> only zip format is supported.
>> Alternatively, you could upload the files to S3 or other DFS and
>> access from TMs and re-upload when needed.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
>>
>> Regards,
>> Roman
>>
>> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
>>  wrote:
>> >
>> > Hi,
>> >
>> > I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON 
>> > schema files actually). The path of this file can be passed into the UDTF, 
>> > but essentially this path needs to exist on the Task Manager node where 
>> > the task executes. What's the best way to upload these resource files? As 
>> > of now, my custom Flink image creates a fixed path with the required 
>> > resource files, but I'd like it to be run time configurable.
>> >
>> > There are 2 APIs available to load files when submitting a PyFlink job...
>> >
>> > stream_execution_environment.add_python_file() - Recommended to upload 
>> > files (.py etc) but doesn't let me configure the final path on the target 
>> > node. The files are added to PYTHONPATH, but it needs the UDTF function to 
>> > lookup for this file. I'd like to pass the file location into the UDTF 
>> > instead.
>> >
>> > stream_execution_environment.add_python_archive() - Appears to be more 
>> > generic, in the sense that it allows a target directory to be specified. 
>> > The documentation doesn't say anything about the contents of the archive, 
>> > so I'm guessing it could be any type of file. Is this what is needed for 
>> > my use case?
>> >
>> > Or is there any other recommended way to upload non-Python 
>> > dependencies/resources?
>> >
>> > Thanks in advance,
>> > Sumeet


Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-11 Thread yidan zhao
OK. done。  成功了。 至于怎么区分机器的话可以使用env或sys变量。
JM和TM的话可以自己改一改flink-daemon.sh脚本的启动部分,新增 "-Dxxx=${DAEMON}-${id}"
系统启动属性。然后log4使用${sys:xxx}

yidan zhao  于2021年6月10日周四 下午8:08写道:
>
>  我也尝试了一波,不过比较奇怪的是,我程序测试log4可以写kafka OK。但是flink就是写不进去。
>
> yidan zhao  于2021年6月10日周四 下午4:18写道:
> >
> > @yujianbo hi。可以把你log4j的配置发出来嘛,我也参考参考。
> >
> > yujianbo <15205029...@163.com> 于2021年6月10日周四 下午3:31写道:
> > >
> > > 大佬,能告知一下吗?我目前知道lay out有这么多的参数可以配置,哪个参数能区分jm或者tm的日志呢:
> > >
> > > 具体的格式化说明:
> > >   %p:输出日志信息的优先级,即DEBUG,INFO,WARN,ERROR,FATAL。
> > >   %d:输出日志时间点的日期或时间,默认格式为ISO8601,也可以在其后指定格式,如:%d{/MM/dd
> > > HH:mm:ss,SSS}。
> > >   %r:输出自应用程序启动到输出该log信息耗费的毫秒数。
> > >   %t:输出产生该日志事件的线程名。
> > >
> > > %l:输出日志事件的发生位置,相当于%c.%M(%F:%L)的组合,包括类全名、方法、文件名以及在代码中的行数。例如:test.TestLog4j.main(TestLog4j.java:10)。
> > >   %c:输出日志信息所属的日志对象,也就是getLogger()中的内容。
> > >   %C:输出日志信息所属的类目;
> > >   %logger:log4j中没有此格式;
> > >   %M:输出产生日志信息的方法名。
> > >   %F:输出日志消息产生时所在的文件名称。
> > >   %L::输出代码中的行号。
> > >   %m::输出代码中指定的具体日志信息。
> > >   %n:输出一个回车换行符,Windows平台为"rn",Unix平台为"n"。
> > >   %x:输出和当前线程相关联的NDC(嵌套诊断环境),尤其用到像java servlets这样的多客户多线程的应用中。
> > >   %%:输出一个"%"字符。
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/


1.13.1 jobmanager annotations by pod template does not work

2021-06-11 Thread 陳昌倬

Hi,

We found that jobmanager annotations defined by pod template does not
work. However, annotations defined by kubernetes.jobmanager.annotations
[0]

This behavior is different from document [1].


[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#kubernetes-jobmanager-annotations
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Add control mode for flink

2021-06-11 Thread Till Rohrmann
Thanks for starting this discussion. I do see the benefit of dynamically
configuring your Flink job and the cluster running it. Some of the use
cases which were mentioned here are already possible. E.g. adjusting the
log level dynamically can be done by configuring an appropriate logging
backend and then changing the logging properties (log4j 2 supports this for
example). Then the remaining use cases can be categorized into two
categories:

1) changing the job
2) changing the cluster configuration

1) would benefit from general control flow events which will be processed
by all operators. 2) would require some component sending some control
events to the other Flink processes.

Implementing the control flow events can already be done to a good extent
on the user level by using a connected stream and a user level-record type
which can distinguish between control events and normal records.
Admittedly, it is a bit of work, though.

I think persisting all of these changes would be very important because
otherwise, you might end up easily in an inconsistent state. For example,
assume you have changed the log level and now a subset of the TaskManagers
needs to be restarted. Now, all of a sudden some TaskManagers log on level
X and the others on level Y. The same applies to job changes. A regional
failover would have to restore the latest dynamically configured state. All
in all, this looks like a very complex and complicated task.

On the other hand, most of the described use cases should be realizable
with a restart of a job. So if Flink were able to quickly resume a job,
then we would probably not need this feature. Applying the changes to the
Flink and the job configuration and resubmitting the job would do the
trick. Hence, improving Flink's recovery speed could be an alternative
approach to this problem.

Cheers,
Till

On Fri, Jun 11, 2021 at 9:51 AM Jary Zhen  wrote:

>  big +1 for this feature,
>
>1. Reset kafka offset in certain cases.
>2. Stop checkpoint in certain cases.
>3. Change log level for debug.
>
>
> 刘建刚  于2021年6月11日周五 下午12:17写道:
>
>> Thanks for all the discussions and suggestions. Since the topic has
>> been discussed for about a week, it is time to have a conclusion and new
>> ideas are welcomed at the same time.
>> First, the topic starts with use cases in restful interface. The
>> restful interface supported many useful interactions with users, for
>> example as follows. It is an easy way to control the job compared with
>> broadcast api.
>>
>>1. Change data processing’ logic by dynamic configs, such as filter
>>condition.
>>2. Define some tools to control the job, such as QPS limit, sampling,
>>change log level and so on.
>>
>> Second, we broaden the topic to control flow in order to support all
>> kinds of control events besides the above user cases. There is a strong
>> demand to support custom (broadcast) events for iteration, SQL control
>> events and so on. As Xintong Song said, the key to the control flow lies as
>> follows:
>>
>>1. Who (which component) is responsible for generating the control
>>messages? It may be the jobmaster by some ways, the inner operator and so
>>on.
>>2. Who (which component) is responsible for reacting to the messages.
>>3. How do the messages propagate? Flink should support sending
>>control messages by channels.
>>4. When it comes to affecting the computation logics, how should the
>>control flow work together with the exact-once consistency.  To use the
>>checkpoint mechanism, control messages flowing from source to down tasks
>>may be a good idea.
>>
>> Third, a common and flexible control flow design requires good design
>> and implementation as a base. Future features and existing features should
>> both be considered. For future features, a common restful interface is
>> first needed to support dynamic configs. For existing features, There exist
>> checkpoint barriers, watermark and latency marker. They have some special
>> behaviors but also share a lot in common. The common logic should be
>> considered but maybe they should remain unchanged until the control flow is
>> stable.
>> Some other problems as follows:
>>
>>1. How to persist the control signals when the jobmaster fails? An
>>idea is to persist control signals in HighAvailabilityServices and replay
>>them after failover. The restful request should be non-blocking.
>>2. Should all the operators receive the control messages? All
>>operators should have the ability to receive upper operators' control
>>messages but maybe not process them. If we want to persist the control
>>message state, all the subtasks belonging to one operator should have the
>>same control events in order to rescale easily.
>>
>> For the next step, I will draft a FLIP with the scope of common
>> control flow framework. More discussions, ideas and problems are still
>> welcome.
>>
>> Thank you~
>>

Re:Re:Re:flink sql cdc数据同步至mysql

2021-06-11 Thread 东东



他这里列举的case是sink前发生了基于非pk的shuffle,比如说有join而且join条件不是主键,你的场景是怎样的呢?




另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es 
connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc 
connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash 
shuffle,确保相同pk的记录发到同一个sink task。


在 2021-06-11 15:57:29,"casel.chen"  写道:
>引用 Leonard Xu大佬之前的回答:
>
>> flink 1.13的jdbc connector新增 sink.parallism 
>> 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?
>
>这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 
>sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 
>https://issues.apache.org/jira/browse/FLINK-20374 
> 
>https://issues.apache.org/jira/browse/FLINK-22901 
> 
>
>说明加 sink.parallelism 是不行的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-06-11 15:44:51,"JasonLee" <17610775...@163.com> 写道:
>>hi
>>
>>sink 端可以通过 sink.parallelism 进行设置.
>>
>>
>>
>>-
>>Best Wishes
>>JasonLee
>>--
>>Sent from: http://apache-flink.147419.n8.nabble.com/


after upgrade flink1.12 to flink1.13.1, flink web-ui's taskmanager detail page error

2021-06-11 Thread yidan zhao
I upgrade flink from 1.12 to 1.13.1, and the rest api
(http://xxx:8600/#/task-manager/xxx:34575-c53c6c/metrics) failed.
My standalone cluster include 30 Jobmanagers and 30 Taskmanagers, and
I found the api only works in the one jobmanager when it is the rest
api leader.

for example, jobmanager1(http://jobmanager1:8600/#/...)  and
jobmanager2(http://jobmanager2:8600/#/...)。The overview works all, but
the taskmanager detail page has this issue.

Here is the error log:


2021-06-10 13:00:27,395 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -

2021-06-10 13:00:27,399 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Preconfiguration:
2021-06-10 13:00:27,400 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -


RESOURCE_PARAMS extraction logs:
jvm_params: -Xmx2093796552 -Xms2093796552 -XX:MaxMetaspaceSize=536870912
dynamic_configs: -D jobmanager.memory.off-heap.size=268435456b -D
jobmanager.memory.jvm-overhead.min=322122552b -D
jobmanager.memory.jvm-metaspace.size=536870912b -D
jobmanager.memory.heap.size=2093796552b -D
jobmanager.memory.jvm-overhead.max=322122552b
logs: INFO  [] - Loading configuration property:
taskmanager.numberOfTaskSlots, 20
INFO  [] - Loading configuration property: cluster.evenly-spread-out-slots, true
INFO  [] - Loading configuration property: parallelism.default, 1
INFO  [] - Loading configuration property: jobmanager.memory.process.size, 3gb
INFO  [] - Loading configuration property:
jobmanager.memory.jvm-metaspace.size, 512mb
INFO  [] - Loading configuration property:
jobmanager.memory.jvm-overhead.fraction, 0.1
INFO  [] - Loading configuration property:
jobmanager.memory.jvm-overhead.min, 192mb
INFO  [] - Loading configuration property:
jobmanager.memory.jvm-overhead.max, 512mb
INFO  [] - Loading configuration property:
jobmanager.memory.off-heap.size, 256mb
INFO  [] - Loading configuration property: taskmanager.memory.process.size, 20gb
INFO  [] - Loading configuration property:
taskmanager.memory.jvm-metaspace.size, 512mb
INFO  [] - Loading configuration property:
taskmanager.memory.jvm-overhead.fraction, 0.1
INFO  [] - Loading configuration property:
taskmanager.memory.jvm-overhead.min, 192mb
INFO  [] - Loading configuration property:
taskmanager.memory.jvm-overhead.max, 512mb
INFO  [] - Loading configuration property: taskmanager.memory.segment-size, 32kb
INFO  [] - Loading configuration property:
taskmanager.memory.managed.fraction, 0.4
INFO  [] - Loading configuration property: taskmanager.memory.managed.size, 64mb
INFO  [] - Loading configuration property:
taskmanager.memory.network.fraction, 0.1
INFO  [] - Loading configuration property: taskmanager.memory.network.min, 1gb
INFO  [] - Loading configuration property: taskmanager.memory.network.max, 2gb
INFO  [] - Loading configuration property:
taskmanager.memory.framework.off-heap.size, 256mb
INFO  [] - Loading configuration property:
taskmanager.memory.task.off-heap.size, 512mb
INFO  [] - Loading configuration property:
taskmanager.memory.framework.heap.size, 256mb
INFO  [] - Loading configuration property: high-availability, zookeeper
INFO  [] - Loading configuration property:
high-availability.storageDir, bos://flink-bucket/flink/ha
INFO  [] - Loading configuration property:
high-availability.zookeeper.quorum,
bjhw-aisecurity-cassandra01.bjhw:9681,bjhw-aisecurity-cassandra02.bjhw:9681,bjhw-aisecurity-cassandra03.bjhw:9681,bjhw-aisecurity-cassandra04.bjhw:9681,bjhw-aisecurity-cassandra05.bjhw:9681
INFO  [] - Loading configuration property:
high-availability.zookeeper.path.root, /flink
INFO  [] - Loading configuration property:
high-availability.cluster-id, opera_upd_FlinkTestJob3
INFO  [] - Loading configuration property: web.checkpoints.history, 100
INFO  [] - Loading configuration property: state.checkpoints.num-retained, 100
INFO  [] - Loading configuration property: state.checkpoints.dir,
bos://flink-bucket/flink/default-checkpoints
INFO  [] - Loading configuration property: state.savepoints.dir,
bos://flink-bucket/flink/default-savepoints
INFO  [] - Loading configuration property:
jobmanager.execution.failover-strategy, region
INFO  [] - Loading configuration property: web.submit.enable, false
INFO  [] - Loading configuration property: jobmanager.archive.fs.dir,
bos://flink-bucket/flink/completed-jobs/opera_upd_FlinkTestJob3
INFO  [] - Loading configuration property:
historyserver.archive.fs.dir,
bos://flink-bucket/flink/completed-jobs/opera_upd_FlinkTestJob3
INFO  [] - Loading configuration property:
historyserver.archive.fs.refresh-interval, 1
INFO  [] - Loading configuration property: rest.port, 8600
INFO  [] - Loading configuration property: historyserver.web.port, 8700
INFO  [] - Loading configuration property:
high-availability.jobmanager.port, 9318
INFO  [] - Loading configuration property: blob.server.port, 9320
INFO  [] - Loading configuration property: 

Serializer consumed more bytes than the record had

2021-06-11 Thread silence
flink 版本1.12
异常如下:
java.io.IOException: Can't get next record for channel
InputChannelInfo{gateIdx=0, inputChannelIdx=0}
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization methods.
If you are using a Kryo-serialized type, check the corresponding Kryo
serializer.
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:339)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:111)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:86)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:163)
... 8 more
Caused by: java.lang.IndexOutOfBoundsException: pos: 140427053897089,
length: 16805746, index: 17, offset: 0
at
org.apache.flink.core.memory.HybridMemorySegment.get(HybridMemorySegment.java:224)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:101)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readFully(NonSpanningWrapper.java:92)
at
org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.deserialize(BinaryRowDataSerializer.java:100)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:108)
at
org.apache.flink.table.runtime.typeutils.RowDataSerializer.deserialize(RowDataSerializer.java:48)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
at
org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
... 11 more

大概是什么原因



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re:flink sql cdc数据同步至mysql

2021-06-11 Thread casel.chen
引用 Leonard Xu大佬之前的回答:

> flink 1.13的jdbc connector新增 sink.parallism 
> 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?

这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 
sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 
https://issues.apache.org/jira/browse/FLINK-20374 
 
https://issues.apache.org/jira/browse/FLINK-22901 
 

说明加 sink.parallelism 是不行的














在 2021-06-11 15:44:51,"JasonLee" <17610775...@163.com> 写道:
>hi
>
>sink 端可以通过 sink.parallelism 进行设置.
>
>
>
>-
>Best Wishes
>JasonLee
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Add control mode for flink

2021-06-11 Thread Jary Zhen
 big +1 for this feature,

   1. Reset kafka offset in certain cases.
   2. Stop checkpoint in certain cases.
   3. Change log level for debug.


刘建刚  于2021年6月11日周五 下午12:17写道:

> Thanks for all the discussions and suggestions. Since the topic has
> been discussed for about a week, it is time to have a conclusion and new
> ideas are welcomed at the same time.
> First, the topic starts with use cases in restful interface. The
> restful interface supported many useful interactions with users, for
> example as follows. It is an easy way to control the job compared with
> broadcast api.
>
>1. Change data processing’ logic by dynamic configs, such as filter
>condition.
>2. Define some tools to control the job, such as QPS limit, sampling,
>change log level and so on.
>
> Second, we broaden the topic to control flow in order to support all
> kinds of control events besides the above user cases. There is a strong
> demand to support custom (broadcast) events for iteration, SQL control
> events and so on. As Xintong Song said, the key to the control flow lies as
> follows:
>
>1. Who (which component) is responsible for generating the control
>messages? It may be the jobmaster by some ways, the inner operator and so
>on.
>2. Who (which component) is responsible for reacting to the messages.
>3. How do the messages propagate? Flink should support sending control
>messages by channels.
>4. When it comes to affecting the computation logics, how should the
>control flow work together with the exact-once consistency.  To use the
>checkpoint mechanism, control messages flowing from source to down tasks
>may be a good idea.
>
> Third, a common and flexible control flow design requires good design
> and implementation as a base. Future features and existing features should
> both be considered. For future features, a common restful interface is
> first needed to support dynamic configs. For existing features, There exist
> checkpoint barriers, watermark and latency marker. They have some special
> behaviors but also share a lot in common. The common logic should be
> considered but maybe they should remain unchanged until the control flow is
> stable.
> Some other problems as follows:
>
>1. How to persist the control signals when the jobmaster fails? An
>idea is to persist control signals in HighAvailabilityServices and replay
>them after failover. The restful request should be non-blocking.
>2. Should all the operators receive the control messages? All
>operators should have the ability to receive upper operators' control
>messages but maybe not process them. If we want to persist the control
>message state, all the subtasks belonging to one operator should have the
>same control events in order to rescale easily.
>
> For the next step, I will draft a FLIP with the scope of common
> control flow framework. More discussions, ideas and problems are still
> welcome.
>
> Thank you~
>
> Jiangang Liu
>
>
>
>
>
>
>
> Xintong Song  于2021年6月9日周三 下午12:01写道:
>
>> >
>> > 2. There are two kinds of existing special elements, special stream
>> > records (e.g. watermarks) and events (e.g. checkpoint barrier). They all
>> > flow through the whole DAG, but events needs to be acknowledged by
>> > downstream and can overtake records, while stream records are not). So
>> I’m
>> > wondering if we plan to unify the two approaches in the new control flow
>> > (as Xintong mentioned both in the previous mails)?
>> >
>>
>> TBH, I don't really know yet. We feel that the control flow is a
>> non-trivial topic and it would be better to bring it up publicly as early
>> as possible, while the concrete plan is still on the way.
>>
>> Personally, I'm leaning towards not touching the existing watermarks and
>> checkpoint barriers in the first step.
>> - I'd expect the control flow to be introduced as an experimental feature
>> that takes time to stabilize. It would be better that the existing
>> important features like checkpointing and watermarks stay unaffected.
>> - Checkpoint barriers are a little different, as other control messages
>> somehow rely on it to achieve exactly once consistency. Without the
>> concrete design, I'm not entirely sure whether it can be properly modeled
>> as a special case of general control messages.
>> - Watermarks are probably similar to the other control messages. However,
>> it's already exposed to users as public APIs. If we want to migrate it to
>> the new control flow, we'd be very careful not to break any compatibility.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Jun 9, 2021 at 11:30 AM Steven Wu  wrote:
>>
>> > > producing control events from JobMaster is similar to triggering a
>> > savepoint.
>> >
>> > Paul, here is what I see the difference. Upon job or jobmanager
>> recovery,
>> > we don't need to recover and replay the savepoint trigger signal.
>> >
>> > On Tue, Jun 8, 2021 at 8:20 PM 

Re: Re:Re:flink sql cdc数据同步至mysql

2021-06-11 Thread JasonLee
hi

sink 端可以通过 sink.parallelism 进行设置.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: How to gracefully handle job recovery failures

2021-06-11 Thread Till Rohrmann
Hi Li,

Roman is right about Flink's behavior and what you can do about it. The
idea behind its current behavior is the following: If Flink cannot recover
a job, it is very hard for it to tell whether it is due to an intermittent
problem or a permanent one. No matter how often you retry, you can always
run into the situation that you give up too early. Since we believe that
this would be a very surprising behavior because it effectively means that
Flink can forget about jobs in case of a recovery, we decided that this
situation requires the intervention of the user to resolve the situation.
By enforcing the user to make a decision, we make this problem very
explicit and require her to think about the situation. I hope this makes
sense.

So in your case, what you have to do is to remove the relevant ZooKeeper
zNode which contains the pointer to the submitted job graph file. That way,
Flink will no longer try to recover this job. I do agree that this is a bit
cumbersome and it could definitely help to offer a small tool to do this
kind of cleanup task.

Cheers,
Till

On Fri, Jun 11, 2021 at 8:24 AM Roman Khachatryan  wrote:

> Hi Li,
>
> If I understand correctly, you want the cluster to proceed recovery,
> skipping some non-recoverable jobs (but still recover others).
> The only way I can think of is to remove the corresponding nodes in
> ZooKeeper which is not very safe.
>
> I'm pulling in Robert and Till who might know better.
>
> Regards,
> Roman
>
>
> On Thu, Jun 10, 2021 at 8:56 PM Li Peng  wrote:
> >
> > Hi Roman,
> >
> > Is there a way to abandon job recovery after a few tries? By that I mean
> that this problem was fixed by me restarting the cluster and not try to
> recover a job. Is there some setting that emulates what I did, so I don't
> need to do manual intervention if this happens again??
> >
> > Thanks,
> > Li
> >
> > On Thu, Jun 10, 2021 at 9:50 AM Roman Khachatryan 
> wrote:
> >>
> >> Hi Li,
> >>
> >> The missing file is a serialized job graph and the job recovery can't
> >> proceed without it.
> >> Unfortunately, the cluster can't proceed if one of the jobs can't
> recover.
> >>
> >> Regards,
> >> Roman
> >>
> >> On Thu, Jun 10, 2021 at 6:02 AM Li Peng  wrote:
> >> >
> >> > Hey folks, we have a cluster with HA mode enabled, and recently after
> doing a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v.
> 2.12) crashed and was stuck in a crash loop, with the following error:
> >> >
> >> > 2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error
> occurred in the cluster entrypoint.
> >> > java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not recover job with job
> id .
> >> > at
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
> >> > at
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
> >> > at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
> >> > at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> >> > at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> >> > at java.base/java.lang.Thread.run(Thread.java:834)
> >> > Caused by: org.apache.flink.util.FlinkRuntimeException: Could not
> recover job with job id .
> >> > at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
> >> > at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
> >> > at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
> >> > at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
> >> > at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> >> > ... 3 common frames omitted
> >> > Caused by: org.apache.flink.util.FlinkException: Could not retrieve
> submitted JobGraph from state handle under
> /. This indicates that the retrieved state
> handle is broken. Try cleaning the state handle store.
> >> > at
> org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
> >> > at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
> >> > ... 7 common frames omitted
> >> > Caused by: java.io.FileNotFoundException: No such file or directory:
> s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
> >> > at
> 

Re:Re:flink sql cdc数据同步至mysql

2021-06-11 Thread casel.chen
单表存量数据的确很大,但业务方不需要同步存量,只需要同步增量就行


flink sql如何修改下游sink端的并行度呢?通过sql hint?

















在 2021-06-11 11:08:58,"东东"  写道:
>1、有必要考虑其他方案了,如果是单表存量数据很大,且不说下游sink的问题,单单是snapshot阶段可能耗时过长,如果一旦失败,就只能整体重来(因为此时不能做checkpoint),任务的成功率就很值得怀疑(当然主要还看存量数据到底有多大)。另外,如果能获取全局锁还好,如果无法获取,则会锁表直到存量数据全部拷贝完毕,基本等于业务down掉。
>2、如果只是简单的insert into xxx  select 
>xxx,就不用担心,runtime在遇到上下游并行度不一致时,如果有主键会按照主键hash的。
>
>
>在 2021-06-08 14:05:17,"casel.chen"  写道:
>>flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc 
>>connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
>>flink 1.13的jdbc connector新增 sink.parallism 
>>参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?


Re: 关于CEP处理事件的问题

2021-06-11 Thread yue ma
hello ,使用EventTime的前提下是这样的。事件来了之后不会立即去触发匹配,而是会注册一个timer,然后将数据缓存起来。当后续有事件
advanceWatermark 触发 timer之后才会开始计算。

sherlock zw  于2021年6月10日周四 下午9:55写道:

> 大佬们,请教一下,我现在使用CEP时遇到一个问题,我现在的场景是需要输入三次相同字符串打印一次匹配的List
> 集合,但是遇到的问题是每次都需要输入第四条数据才会触发Pattern的select函数去打印List。
>
> 具体实现代码如下:
>
> public class Run3
> *{ *public static void main*(*String*[] *args*) *throws Exception
> *{ *final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.*getExecutionEnvironment**()*;
> env.setParallelism*(*1*)*;
> final DataStream*<*String*> *source = env.socketTextStream*(*
> "localhost", 
> *) *.assignTimestampsAndWatermarks
> *( *WatermarkStrategy.*<*String*>*
> *forMonotonousTimestamps*
> *() *.withTimestampAssigner*((*String s,
> long ts*) *-> System.*currentTimeMillis*
>
> *()) ) *.keyBy*(*s -> s*)*;
> source.print*(*"source "*)*;
> final Pattern*<*String, String*> *pattern = Pattern.*<*String*>*
> *begin**(*"begin", AfterMatchSkipStrategy.*skipPastLastEvent*
> *()) *.where*(*new SimpleCondition*<*String
> *>() { *@Override
> public boolean filter*(*String s*) *throws Exception
> *{ *return true;
>
> *} })*.times*(*3*)*;
> final PatternStream*<*String*> *patternStream = CEP.*pattern**(*
> source, pattern*)*;
> patternStream.select*(*new PatternSelectFunction*<*String, Object
> *>() { *@Override
> public Object select*(*Map*<*String, List*<*String*>> *pattern
> *) { *return pattern.get*(*"begin"*)*;
>
> *} })*.print*(*"result "*)*;
> env.execute*()*;
>
> *} }*
>
>
>
> 环境如下:
>
> Flink 1.12.2
>
> OS:Windows 10
>
> 编程工具:IDEA 2021.1.2
>
> 使用的是Flink默认的事件时间,水位线用的是单调递增的,使用的是系统时间
>
>
>
> 运行结果如下所示:
>
>
>


Re:Re: flink sql维表延迟join如何实现?

2021-06-11 Thread casel.chen
有例子吗?或者相关资料连接也行

















在 2021-06-11 12:40:10,"chenchencc" <1353637...@qq.com> 写道:
>使用事件时间就可以延时
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: 应用初始化HiveCatalog出错 "URI is not hierarchical"

2021-06-11 Thread casel.chen
hive-stie.xml不在classpath下面,而是通过配置文件加载的: hiveConfig.getHiveConfDir() 例如 
/opt/hive/conf 这个目录下有hive-site.xml

















在 2021-06-11 13:59:20,"Rui Li"  写道:
>你好,
>
>看一下jar里面是不是有hive-site.xml文件呢?
>
>On Fri, Jun 11, 2021 at 10:37 AM casel.chen  wrote:
>
>> 我在spring boot应用中使用HiveCatalog展示库和表信息,通过传入hive参数初始化HiveCatalog时抛如下错误
>> hiveCatalog = new HiveCatalog(hiveConfig.getCatalogName(),
>> hiveConfig.getDefaultDatabase(), hiveConfig.getHiveConfDir());
>> hiveCatalog.open();
>> 在spring boot应用所在机器上我只在/opt/hive/conf目录下准备了hive-site.xml,还缺什么配置么?
>>
>>
>>  2021-06-11 10:32:20.044 - [http-nio-8080-exec-4] INFO
>> org.apache.hadoop.hive.conf.HiveConf - Found configuration file
>> file:/opt/hive/conf/hive-site.xml
>> org.springframework.web.util.NestedServletException: Handler dispatch
>> failed; nested exception is java.lang.ExceptionInInitializerError
>> at
>> org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1055)
>> at
>> org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
>> at
>> org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
>> at
>> org.springframework.web.servlet.FrameworkServlet.doPut(FrameworkServlet.java:920)
>> at javax.servlet.http.HttpServlet.service(HttpServlet.java:655)
>> at
>> org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
>> at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
>> at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
>> at
>> org.springframework.web.filter.CorsFilter.doFilterInternal(CorsFilter.java:92)
>> at
>> org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
>> at
>> org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
>> at
>> org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
>> at
>> org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
>> at
>> org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
>> at
>> org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93)
>> at
>> org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
>> at
>> org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
>> at
>> org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
>> at
>> org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
>> at
>> org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
>> at
>> org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
>> at
>> org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
>> at
>> org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
>> at
>> org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
>> at
>> org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
>> at org.apache.catalina.valves.RemoteIpValve.invoke(RemoteIpValve.java:747)
>> at
>> org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
>> at
>> org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
>> at
>> org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
>> at
>> 

Re: PyFlink: Upload resource files to Flink cluster

2021-06-11 Thread Sumeet Malhotra
Thank you Roman. Yes, that's what I am going to do.

But I'm running into another issue... when I specify the *--pyArchives*
option on the command line, the job never gets submitted and is stuck
forever. And when I try to programmatically do this by calling
*add_python_archive()*, the job gets submitted but fails because the target
directory is not found on the UDF node. Flink is deployed on a K8S cluster
in my case and the port 8081 is forwarded to the localhost.

Here's the command line I use:

~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python
my_job.py  --pyArchives file:///path/to/schema.zip#schema

And within the UDF I'm access the schema file as:

read_schema('schema/my_schema.json')

Or if I try using the API instead of the command-line, the call looks as:

env = StreamExecutionEnvironment.get_execution_environment()
env.add_python_archive('schema.zip', 'schema')

Initially, my_job.py itself had its own command line options, and I was
thinking that might interfere with the overall Flink command line options,
but even after removing that I'm not able to submit the job anymore.
However, if I don't use the --pyArchives option and manually transfer the
schema file to a location on the UDF node, the job gets submitted and works
as expected.

Any reason why this might happen?

Thanks,
Sumeet


On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan  wrote:

> Hi,
>
> I think the second option is what you need. The documentation says
> only zip format is supported.
> Alternatively, you could upload the files to S3 or other DFS and
> access from TMs and re-upload when needed.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
>
> Regards,
> Roman
>
> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
>  wrote:
> >
> > Hi,
> >
> > I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON
> schema files actually). The path of this file can be passed into the UDTF,
> but essentially this path needs to exist on the Task Manager node where the
> task executes. What's the best way to upload these resource files? As of
> now, my custom Flink image creates a fixed path with the required resource
> files, but I'd like it to be run time configurable.
> >
> > There are 2 APIs available to load files when submitting a PyFlink job...
> >
> > stream_execution_environment.add_python_file() - Recommended to upload
> files (.py etc) but doesn't let me configure the final path on the target
> node. The files are added to PYTHONPATH, but it needs the UDTF function to
> lookup for this file. I'd like to pass the file location into the UDTF
> instead.
> >
> > stream_execution_environment.add_python_archive() - Appears to be more
> generic, in the sense that it allows a target directory to be specified.
> The documentation doesn't say anything about the contents of the archive,
> so I'm guessing it could be any type of file. Is this what is needed for my
> use case?
> >
> > Or is there any other recommended way to upload non-Python
> dependencies/resources?
> >
> > Thanks in advance,
> > Sumeet
>


Re: How to gracefully handle job recovery failures

2021-06-11 Thread Roman Khachatryan
Hi Li,

If I understand correctly, you want the cluster to proceed recovery,
skipping some non-recoverable jobs (but still recover others).
The only way I can think of is to remove the corresponding nodes in
ZooKeeper which is not very safe.

I'm pulling in Robert and Till who might know better.

Regards,
Roman


On Thu, Jun 10, 2021 at 8:56 PM Li Peng  wrote:
>
> Hi Roman,
>
> Is there a way to abandon job recovery after a few tries? By that I mean that 
> this problem was fixed by me restarting the cluster and not try to recover a 
> job. Is there some setting that emulates what I did, so I don't need to do 
> manual intervention if this happens again??
>
> Thanks,
> Li
>
> On Thu, Jun 10, 2021 at 9:50 AM Roman Khachatryan  wrote:
>>
>> Hi Li,
>>
>> The missing file is a serialized job graph and the job recovery can't
>> proceed without it.
>> Unfortunately, the cluster can't proceed if one of the jobs can't recover.
>>
>> Regards,
>> Roman
>>
>> On Thu, Jun 10, 2021 at 6:02 AM Li Peng  wrote:
>> >
>> > Hey folks, we have a cluster with HA mode enabled, and recently after 
>> > doing a zookeeper restart, our Kafka cluster (Flink v. 1.11.3, Scala v. 
>> > 2.12) crashed and was stuck in a crash loop, with the following error:
>> >
>> > 2021-06-10 02:14:52.123 [cluster-io-thread-1] ERROR 
>> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error 
>> > occurred in the cluster entrypoint.
>> > java.util.concurrent.CompletionException: 
>> > org.apache.flink.util.FlinkRuntimeException: Could not recover job with 
>> > job id .
>> > at 
>> > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>> > at 
>> > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>> > at 
>> > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
>> > at 
>> > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> > at 
>> > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> > at java.base/java.lang.Thread.run(Thread.java:834)
>> > Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover 
>> > job with job id .
>> > at 
>> > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:149)
>> > at 
>> > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:125)
>> > at 
>> > org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:200)
>> > at 
>> > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:115)
>> > at 
>> > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>> > ... 3 common frames omitted
>> > Caused by: org.apache.flink.util.FlinkException: Could not retrieve 
>> > submitted JobGraph from state handle under 
>> > /. This indicates that the retrieved state 
>> > handle is broken. Try cleaning the state handle store.
>> > at 
>> > org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore.recoverJobGraph(ZooKeeperJobGraphStore.java:192)
>> > at 
>> > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJob(SessionDispatcherLeaderProcess.java:146)
>> > ... 7 common frames omitted
>> > Caused by: java.io.FileNotFoundException: No such file or directory: 
>> > s3a://myfolder/recovery/myservice/2021-05-25T04:36:33Z/submittedJobGraph06ea8512c493
>> > at 
>> > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
>> > at 
>> > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
>> > at 
>> > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
>> > at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
>> > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
>> > at 
>> > org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:120)
>> > at 
>> > org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.open(HadoopFileSystem.java:37)
>> > at 
>> > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.open(PluginFileSystemFactory.java:127)
>> > at 
>> > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
>> > at 
>> > org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:65)
>> > at 
>> > org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
>> > at 
>> >