????

2022-06-08 Thread ????


Re: 退订

2022-06-08 Thread Lijie Wang
Hi,退订请发送至邮箱 user-zh-unsubscr...@flink.apache.org,而不是
user-zh@flink.apache.org 

Best,
Lijie

黎永康  于2022年6月9日周四 09:56写道:

> 退订


Re:Re: flink k8s ha

2022-06-08 Thread json
恩,明白保留HA配置的意义了但感觉是不是有bug,看我的问题,重启报找不到 
/high-availability.storageDir/task/completedCheckpointe5c125ad20ea 
文件但oss上的HA目录只有 
/high-availability.storageDir/task/completedCheckpointacdfb4309903既HA的configmap
 信息和 high-availability.storageDir 目录里的文件不一致了
在 2022-06-08 23:06:03,"Weihua Hu"  写道:
>Hi,
>删除 deployment 会将关联到这个 Deployment 的 Pod、Service、flink-conf configmap 等删除。但是
>HA 相关的 configmap 没有配置 owner reference,是不会被删除的。主要目的是集群重启时可以从之前的HA
>状态中恢复。更多内容参考官方文档[1]
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up
>Best,
>Weihua
>
>
>On Wed, Jun 8, 2022 at 4:24 PM json <18042304...@163.com> wrote:
>
>> configmap 如下
>> sql-test--jobmanager-leader
>> sql-test-resourcemanager-leader
>> sql-test-restserver-leader
>> sql-test-dispatcher-leader
>>
>>
>>
>> 在 2022-06-08 15:42:52,"json" <18042304...@163.com> 写道:
>>
>> flink1.13.6 on k8s application 模式,设置HA
>> high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> high-availability.storageDir: oss
>> 会在 k8s 上生成configmap
>>
>>
>> 1. 但在 k8s 删除此任务的 deployment 后,为什么这些configmap还在?(任务都删了,这些ha应该不需要了吧)
>> 2. 任务重新启动后,还是会去这些 configmap 读ha配置,这个逻辑也很奇怪,任务重启,为什么要去读之前HA信息
>>
>> 为什么会关注这个,因为碰到一个问题:
>> 任务重启报错,找不到
>> /high-availability.storageDir/task/completedCheckpointe5c125ad20ea 文件,
>> 但oss 是有文件
>> /high-availability.storageDir/task/completedCheckpointe/completedCheckpointacdfb4309903
>> 导致我任务一直报错;删除 上面的configmap 才能正常运行
>>
>>
>>
>>
>>
>>


退订

2022-06-08 Thread 黎永康
退订

Re:Re: sql-client pyexec参数生效疑问

2022-06-08 Thread RS
Hi,
是这个问题了,成功了,不清楚为什么要把UDF的解释器分开配置



Thx


在 2022-06-08 13:29:48,"Dian Fu"  写道:
>有两个参数指定Python解释器:
>1)-pyexec,指定的是作业执行过程中,用来运行Python UDF的Python解释器路径
>2)-pyclientexec,指定客户端编译作业的时候,用到的Python解释器路径,具体信息可以看一下:
>https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/dependency_management/#python-interpreter-of-client
>
>可以把这个参数-pyclientexec 也加上试试。
>
>On Tue, Jun 7, 2022 at 11:24 AM RS  wrote:
>
>> Hi,
>>
>>
>> 环境:
>> - flink-1.14.3, 单机集群
>> - 服务器上默认python2,也存在python3.6.8
>> - /xxx/bin/python3是python3生成的虚拟环境
>>
>>
>> 使用sql-client测试pyflink的udf,自定义了一个函数f1,/xxx/p.py
>> 启动命令:
>> ./bin/sql-client.sh -pyfs file:///xxx/p.py -pyexec /xxx/bin/python3
>> 配置pyexec指定了使用的python为python3
>>
>>
>> 执行命令报错,报错信息如下:
>> Flink SQL> create temporary function fun1 as 'p.f1' language python;
>> [INFO] Execute statement succeed.
>> Flink SQL> select fun1('a',1,'s');
>> Traceback (most recent call last):
>>   File "/usr/lib64/python2.7/runpy.py", line 151, in _run_module_as_main
>> mod_name, loader, code, fname = _get_module_details(mod_name)
>>   File "/usr/lib64/python2.7/runpy.py", line 101, in _get_module_details
>> loader = get_loader(mod_name)
>>   File "/usr/lib64/python2.7/pkgutil.py", line 464, in get_loader
>> return find_loader(fullname)
>>   File "/usr/lib64/python2.7/pkgutil.py", line 474, in find_loader
>> for importer in iter_importers(fullname):
>>   File "/usr/lib64/python2.7/pkgutil.py", line 430, in iter_importers
>> __import__(pkg)
>>   File "/home/flink-1.14.3/opt/python/pyflink.zip/pyflink/__init__.py",
>> line 26, in 
>> RuntimeError: Python versions prior to 3.6 are not supported for PyFlink
>> [sys.version_info(major=2, minor=7, micro=5, releaselevel='final',
>> serial=0)].
>> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.IllegalStateException: Instantiating python function 'p.f1'
>> failed.
>>
>>
>> 报错提示中使用到的是python2,不是参数里面配置的python3,如何让pyexec生效?
>>
>>
>> Thx


Re: How to handle deletion of items using PyFlink SQL?

2022-06-08 Thread Dian Fu
Hi John,

If you are using Table API & SQL, the framework is handling the RowKind and
it's transparent for you. So usually you don't need to handle RowKind in
Table API & SQL.

Regards,
Dian

On Thu, Jun 9, 2022 at 6:56 AM John Tipper  wrote:

> Hi Xuyang,
>
> Thank you very much, I’ll experiment tomorrow. Do you happen to know
> whether there is a Python example of udtf() with a RowKind being set (or
> whether it’s supported)?
>
> Many thanks,
>
> John
>
> Sent from my iPhone
>
> On 8 Jun 2022, at 16:41, Xuyang  wrote:
>
> 
> Hi, John.
> What about use udtf [1]?
> In your UDTF, all resources are saved as a set or map as s1. When t=2
> arrives, the new resources as s2 will be collected by crawl. I think what
> you want is the deletion data that means 's1' - 's2'.
> So just use loop to find out the deletion data and send RowData in
> function 'eval' in UDTF, and the RowData can be sent with a RowKind
> 'DELETE'[2]. The 'DELETE' means tell the downstream that this value is
> deleted.
>
> I will be glad if it can help you.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-functions
> [2]
> https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-core/src/main/java/org/apache/flink/types/RowKind.java#L52
>
>
>
> --
> Best!
> Xuyang
>
>
> At 2022-06-08 20:06:17, "John Tipper"  wrote:
>
> Hi all,
>
> I have some reference data that is periodically emitted by a crawler
> mechanism into an upstream Kinesis data stream, where those rows are used
> to populate a sink table (and where I am using Flink 1.13 PyFlink SQL
> within AWS Kinesis Data Analytics).  What is the best pattern to handle
> deletion of upstream data, such that the downstream table remains in sync
> with upstream?
>
> For example, at t=1, rows R1, R2, R3 are processed from the stream,
> resulting in a DB with 3 rows.  At some point between t=1 and t=2, the
> resource corresponding to R2 was deleted, such that at t=2 when the next
> crawl was carried out only rows R1 and R2 were emitted into the upstream
> stream.  How should I process the stream of events so that when I have
> finished processing the events from t=2 my downstream table also has just
> rows R1 and R3?
>
> Many thanks,
>
> John
>
>


Re: How to handle deletion of items using PyFlink SQL?

2022-06-08 Thread John Tipper
Hi Xuyang,

Thank you very much, I’ll experiment tomorrow. Do you happen to know whether 
there is a Python example of udtf() with a RowKind being set (or whether it’s 
supported)?

Many thanks,

John

Sent from my iPhone

On 8 Jun 2022, at 16:41, Xuyang  wrote:


Hi, John.
What about use udtf [1]?
In your UDTF, all resources are saved as a set or map as s1. When t=2 arrives, 
the new resources as s2 will be collected by crawl. I think what you want is 
the deletion data that means 's1' - 's2'.
So just use loop to find out the deletion data and send RowData in function 
'eval' in UDTF, and the RowData can be sent with a RowKind 'DELETE'[2]. The 
'DELETE' means tell the downstream that this value is deleted.

I will be glad if it can help you.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-functions
[2] 
https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-core/src/main/java/org/apache/flink/types/RowKind.java#L52



--

Best!
Xuyang


At 2022-06-08 20:06:17, "John Tipper"  wrote:

Hi all,

I have some reference data that is periodically emitted by a crawler mechanism 
into an upstream Kinesis data stream, where those rows are used to populate a 
sink table (and where I am using Flink 1.13 PyFlink SQL within AWS Kinesis Data 
Analytics).  What is the best pattern to handle deletion of upstream data, such 
that the downstream table remains in sync with upstream?

For example, at t=1, rows R1, R2, R3 are processed from the stream, resulting 
in a DB with 3 rows.  At some point between t=1 and t=2, the resource 
corresponding to R2 was deleted, such that at t=2 when the next crawl was 
carried out only rows R1 and R2 were emitted into the upstream stream.  How 
should I process the stream of events so that when I have finished processing 
the events from t=2 my downstream table also has just rows R1 and R3?

Many thanks,

John


Re: Flink Operator 1.0.0 not working

2022-06-08 Thread Gyula Fóra
Seems like something is off with your CRD.

You could try replacing it using:

kubectl replace -f
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml

After that you should be able to deploy the examples.
Gyula

On Wed, Jun 8, 2022 at 4:03 PM Geldenhuys, Morgan Karl <
morgan.geldenh...@tu-berlin.de> wrote:

> Greetings all,
>
>
> I am trying to get the flink operator (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.0/)
> working, however running into a number of issues.
>
>
> I have a fresh Kubernetes cluster running and have followed all the
> instructions for deploying the operator as per the documentation (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.0/docs/try-flink-kubernetes-operator/quick-start/
> ).
>
>
> The pods seem to start up correctly, however when I run the following
> command:
>
> kubectl create -f 
> https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0/examples/basic.yaml
>
> It returns with the following error:
>
>
> error: unable to recognize "
> https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0/examples/basic.yaml":
> no matches for kind "FlinkDeployment" in version "flink.apache.org/v1beta1
> "
>
> Any ideas?
>
>
> Regards,
>
> M.
>
>


Re: Apache Flink - Rest API for num of records in/out

2022-06-08 Thread M Singh
 Hi Andreas:
Thanks for your pointers and reference.
I will check it out.

On Tuesday, June 7, 2022, 05:17:43 PM EDT, Hailu, Andreas 
 wrote:  
 
 
Hi M,
 
  
 
We had a similar requirement – we were able to solve for this by:
 
1.  Supply the operators we’re interested in acquiring metrics for through 
the various name() methods
 
2.  Use the jobid API [1] and find the operator we’ve named in the 
“vertices” array
 
  
 
[1]https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/rest_api/#jobs-jobid
 
  
 
ah
 
  
 
From: M Singh 
Sent: Tuesday, June 7, 2022 4:51 PM
To: User-Flink 
Subject: Apache Flink - Rest API for num of records in/out
 
  
 
Hi Folks:
 
  
 
I am trying to find if I can get the number of records for an operator using 
flinks REST API.  I've checked the docs at 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/rest_api/.  
 
 
  
 
I did see some apis that use vertexid, but could not find how to that info 
without having vertex ids.
 
  
 
I am using flink 1.14.4. 
 
  
 
Can you please let me know how to get that ?
 
  
 
Thanks
 

Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices
  

Recover watermark from savepoint

2022-06-08 Thread Sweta Kalakuntla
Hi,

I want to understand if flink saves a watermark during savepoint and if
not, how do we achieve this?

We are seeing an issue where on recovery, the job processes some late
events which should have been discarded if the job were to be running
without any downtime.

Thank you,
Sweta


Re:flink webui stdout疑惑

2022-06-08 Thread Xuyang
Hi, 
请问下你找的.out文件是tm的还是jm的?tm的out文件应该是有内容的才对。




--

Best!
Xuyang





在 2022-06-08 16:49:20,"陈卓宇" <2572805...@qq.com.INVALID> 写道:
>您好:
>向flink集群提交的sql:
>CREATE TABLE datagen (
>f_sequence INT,
>f_random INT,
>f_random_str STRING,
>ts AS localtimestamp,
>WATERMARK FOR ts AS ts
>  ) WITH (
>'connector' = 'datagen',
>-- optional options --
>'rows-per-second'='5',
>'fields.f_sequence.kind'='sequence',
>'fields.f_sequence.start'='1',
>'fields.f_sequence.end'='500',
>'fields.f_random.min'='1',
>'fields.f_random.max'='500',
>'fields.f_random_str.length'='10'
>  );
>
>  CREATE TABLE print_table (
>f_sequence INT,
>f_random INT,
>f_random_str STRING
>) WITH (
>'connector' = 'print'
>  );
>
>  INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;
>
>
>想请问一下在flink web ui的stdout上发现没有打印一片空白,进入日志是有打印的,这是什么原因导致的? 
>我如何解决,让ui的stdout能把内容打印出来


Re:How to handle deletion of items using PyFlink SQL?

2022-06-08 Thread Xuyang
Hi, John.
What about use udtf [1]?
In your UDTF, all resources are saved as a set or map as s1. When t=2 arrives, 
the new resources as s2 will be collected by crawl. I think what you want is 
the deletion data that means 's1' - 's2'.
So just use loop to find out the deletion data and send RowData in function 
'eval' in UDTF, and the RowData can be sent with a RowKind 'DELETE'[2]. The 
'DELETE' means tell the downstream that this value is deleted.

I will be glad if it can help you.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-functions
[2] 
https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-core/src/main/java/org/apache/flink/types/RowKind.java#L52







--

Best!
Xuyang




At 2022-06-08 20:06:17, "John Tipper"  wrote:

Hi all,


I have some reference data that is periodically emitted by a crawler mechanism 
into an upstream Kinesis data stream, where those rows are used to populate a 
sink table (and where I am using Flink 1.13 PyFlink SQL within AWS Kinesis Data 
Analytics).  What is the best pattern to handle deletion of upstream data, such 
that the downstream table remains in sync with upstream?


For example, at t=1, rows R1, R2, R3 are processed from the stream, resulting 
in a DB with 3 rows.  At some point between t=1 and t=2, the resource 
corresponding to R2 was deleted, such that at t=2 when the next crawl was 
carried out only rows R1 and R2 were emitted into the upstream stream.  How 
should I process the stream of events so that when I have finished processing 
the events from t=2 my downstream table also has just rows R1 and R3?


Many thanks,


John

Re: SourceFunction

2022-06-08 Thread Jing Ge
Hi Alexey,

There is a thread[1] discussing this issue right now. It would be great if
you could share some thoughts about your experience. Thanks!

Best regards,
Jing

[1]https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9

On Wed, Jun 8, 2022 at 4:42 PM Alexey Trenikhun  wrote:

> Hello,
> Is there plan to deprecate SourceFunction in favor of Source API? We have
> custom SourceFunction based source,  do we need to plan to rewrite it using
> new Source API ?
>
> Thanks,
> Alexey
>


Flink Remote Stateful - Crontab Scheduler

2022-06-08 Thread Himanshu Sareen
Team,


We are aware about Sending Delayed 
Messages
 which we can use to trigger delayed stateful functions.

Example Use Case

Invoke a Remote Stateful Function every day @ 11 AM


We are using Python API and didn't came across out-of-box support for Crontab 
like scheduling.

If anyone can share views what will be the best/optimized solution.

Regards,
Himanshu


Re: flink k8s ha

2022-06-08 Thread Weihua Hu
Hi,
删除 deployment 会将关联到这个 Deployment 的 Pod、Service、flink-conf configmap 等删除。但是
HA 相关的 configmap 没有配置 owner reference,是不会被删除的。主要目的是集群重启时可以从之前的HA
状态中恢复。更多内容参考官方文档[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up
Best,
Weihua


On Wed, Jun 8, 2022 at 4:24 PM json <18042304...@163.com> wrote:

> configmap 如下
> sql-test--jobmanager-leader
> sql-test-resourcemanager-leader
> sql-test-restserver-leader
> sql-test-dispatcher-leader
>
>
>
> 在 2022-06-08 15:42:52,"json" <18042304...@163.com> 写道:
>
> flink1.13.6 on k8s application 模式,设置HA
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: oss
> 会在 k8s 上生成configmap
>
>
> 1. 但在 k8s 删除此任务的 deployment 后,为什么这些configmap还在?(任务都删了,这些ha应该不需要了吧)
> 2. 任务重新启动后,还是会去这些 configmap 读ha配置,这个逻辑也很奇怪,任务重启,为什么要去读之前HA信息
>
> 为什么会关注这个,因为碰到一个问题:
> 任务重启报错,找不到
> /high-availability.storageDir/task/completedCheckpointe5c125ad20ea 文件,
> 但oss 是有文件
> /high-availability.storageDir/task/completedCheckpointe/completedCheckpointacdfb4309903
> 导致我任务一直报错;删除 上面的configmap 才能正常运行
>
>
>
>
>
>


Re: flink webui stdout疑惑

2022-06-08 Thread Weihua Hu
进入日志是有打印的是指日志目录的 taskmanager.out 里面有内容吗?
cluster 启动的时候指定日志文件了吗

Best,
Weihua


On Wed, Jun 8, 2022 at 4:51 PM 陈卓宇 <2572805...@qq.com.invalid> wrote:

> 您好:
> flink版本:1.13.1
> 部署方式:on k8s
>
> 向flink集群提交的sql:
> CREATE TABLE datagen (
> f_sequence INT,
> f_random INT,
> f_random_str STRING,
> ts AS localtimestamp,
> WATERMARK FOR ts AS ts
>   ) WITH (
> 'connector' = 'datagen',
> -- optional options --
> 'rows-per-second'='5',
> 'fields.f_sequence.kind'='sequence',
> 'fields.f_sequence.start'='1',
> 'fields.f_sequence.end'='500',
> 'fields.f_random.min'='1',
> 'fields.f_random.max'='500',
> 'fields.f_random_str.length'='10'
>   );
>
>   CREATE TABLE print_table (
> f_sequence INT,
> f_random INT,
> f_random_str STRING
> ) WITH (
> 'connector' = 'print'
>   );
>
>   INSERT INTO print_table select f_sequence,f_random,f_random_str from
> datagen;
>
>
> 想请问一下在flink web ui的stdout上发现没有打印一片空白,进入日志是有打印的,这是什么原因导致的?
> 我如何解决,让ui的stdout能把内容打印出来


SourceFunction

2022-06-08 Thread Alexey Trenikhun
Hello,
Is there plan to deprecate SourceFunction in favor of Source API? We have 
custom SourceFunction based source,  do we need to plan to rewrite it using new 
Source API ?

Thanks,
Alexey


Flink Operator 1.0.0 not working

2022-06-08 Thread Geldenhuys, Morgan Karl
Greetings all,


I am trying to get the flink operator 
(https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.0/)
 working, however running into a number of issues.



I have a fresh Kubernetes cluster running and have followed all the 
instructions for deploying the operator as per the documentation 
(https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.0/docs/try-flink-kubernetes-operator/quick-start/).


The pods seem to start up correctly, however when I run the following command:

kubectl create -f 
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0/examples/basic.yaml

It returns with the following error:


error: unable to recognize 
"https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0/examples/basic.yaml":
 no matches for kind "FlinkDeployment" in version "flink.apache.org/v1beta1"

Any ideas?


Regards,

M.



Kafka Consumer commit error

2022-06-08 Thread Christian Lorenz
Hi,

we have some issues with a job using the flink-sql-connector-kafka (flink 
1.15.0/standalone cluster). If one broker e.g. is restarted for maintainance 
(replication-factor=2), the taskmanagers executing the job are constantly 
logging errors on each checkpoint creation:

Failed to commit consumer offsets for checkpoint 50659
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RetriableCommitFailedException:
 Offset commit failed with a retriable exception. You should retry committing 
the latest consumed offsets.
Caused by: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.CoordinatorNotAvailableException:
 The coordinator is not available.

AFAICT the error itself is produced by the underlying kafka consumer. 
Unfortunately this error cannot be reproduced on our test system.
From my understanding this error might occur once, but follow up checkpoints / 
kafka commits should be fine again.
Currently my only way of “fixing” the issue is to restart the taskmanagers.

Is there maybe some kafka consumer setting which would help to circumvent this?

Kind regards,
Christian
Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63, 80335 
München.
Registered with the District Court München HRB 226181
Managing Directors: Frasier, Christopher & Warren, Steve
This e-mail is from Mapp Digital and its international legal entities and may 
contain information that is confidential or proprietary.
If you are not the intended recipient, do not read, copy or distribute the 
e-mail or any attachments. Instead, please notify the sender and delete the 
e-mail and any attachments.
Please consider the environment before printing. Thank you.


Re: Could not find a suitable table factory for 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.

2022-06-08 Thread yuxia
Have you ever unzip your project jar and make sure the class HiveParserFactory 
exist? 
Best regards, 
Yuxia 


发件人: "顾斌杰"  
收件人: luoyu...@alumni.sjtu.edu.cn 
抄送: "User"  
发送时间: 星期三, 2022年 6 月 08日 下午 5:11:33 
主题: Re: Could not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. 

can refer to this: 
[ 
https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto
 | 
https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto
 ] 

On 6/8/2022 16:04 , [ mailto:luoyu...@alumni.sjtu.edu.cn | 
yuxia ] wrote: 



Have you ever put the flink-sql-connector-hive into you FLINK_HOME/lib? 
And make sure your JM/TM also contains the jar. 

Best regards, 
Yuxia 


发件人: "顾斌杰"  
收件人: "User"  
发送时间: 星期三, 2022年 6 月 08日 下午 3:19:19 
主题: Re: Could not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. 


The following is part of the code : 

String createKafkaSql = "create table if not exists x" + 
"(x\n" + 
",update_time timestamp(3) comment '11'\n" + 
",watermark for update_time as update_time - interval '20' second)\n" + 
"with ('connector' = 'kafka'\n" + 
",'topic' = '" + topic + "'\n" + 
",'properties.bootstrap.servers' = '" + bootstrapServers + "'\n" + 
",'properties.group.id' = 'flink_sql_tyc_company_info'\n" + 
",'scan.startup.mode' = 'earliest-offset'\n" + 
",'format' = 'json','json.fail-on-missing-field' = 
'false','json.ignore-parse-errors' = 'true')"; 
tEnv.executeSql(createKafkaSql); 


tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); 
String CreateHiveSql = "create table if not exists " + 
"()\n" + 
"partitioned by (op_day string comment '111')\n" + 
"stored as orc\n" + 
"tblproperties('partition.time-extractor.timestamp-pattern'='$op_day'\n" + 
",'sink.partition-commit.trigger'='partition-time'\n" + 
",'sink.partition-commit.delay'='1h'\n" + 
",'sink.partition-commit.policy.kind'='metastore,success-file')"; 
tEnv.executeSql(CreateHiveSql); 


tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); 
String insert = "insert into x\n" + 
"select `x" + 
",date_format(update_time,'-MM-dd')\n" + 
"from x"; 
tEnv.executeSql(insert); 
On 6/8/2022 15:14 , [ mailto:binjie...@paat.com | 顾斌杰 ] 
wrote: 

BQ_BEGIN



Flink version: 1.13 





When executed in the local environment (windows), there is no exception. 


When starting the project with flink web ui, I get the following error: 





Server Response: 
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
application. 
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
 
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute application. 
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
 ... 7 more 

Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute 
application. 
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
 
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
 
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ... 7 more 

Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Could not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. 
Reason: Required context properties mismatch. The 

How to handle deletion of items using PyFlink SQL?

2022-06-08 Thread John Tipper
Hi all,

I have some reference data that is periodically emitted by a crawler mechanism 
into an upstream Kinesis data stream, where those rows are used to populate a 
sink table (and where I am using Flink 1.13 PyFlink SQL within AWS Kinesis Data 
Analytics).  What is the best pattern to handle deletion of upstream data, such 
that the downstream table remains in sync with upstream?

For example, at t=1, rows R1, R2, R3 are processed from the stream, resulting 
in a DB with 3 rows.  At some point between t=1 and t=2, the resource 
corresponding to R2 was deleted, such that at t=2 when the next crawl was 
carried out only rows R1 and R2 were emitted into the upstream stream.  How 
should I process the stream of events so that when I have finished processing 
the events from t=2 my downstream table also has just rows R1 and R3?

Many thanks,

John


Re: flink运行一段时间后TaskManager退出,报OutOfMemoryError: Metaspace

2022-06-08 Thread Lijie Wang
看错误是 metaspace OOM 了,可以按照提示,增加 taskmanager.memory.jvm-metaspace.size
的大小,或者增加 TM 总内存大小

Best,
Lijie

weishishuo...@163.com  于2022年6月7日周二 18:37写道:

> 我使用的版本是:
> flink:1.13.2
> flink cdc: flink-connector-jdbc_2.11-1.13.2.jar
> flink-sql-connector-mysql-cdc-2.2.0.jar
> flink-sql-connector-postgres-cdc-2.2.0.jar
>
> 任务比较简单,就是从mysql、pg同步数据到pg,mysql,使用的是sql接口,请问大伙儿有碰到过这个问题吗?
>
> 2022-06-07 18:13:59,393 ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal
> error occurred while executing the TaskManager. Shutting it down...
> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
> has occurred. This can mean two things: either the job requires a larger
> size of JVM metaspace to load classes or there is a class loading leak. In
> the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
> should be increased. If the error persists (usually in cluster after
> several job (re-)submissions) then there is probably a class loading leak
> in user code or some of its dependencies which has to be investigated and
> fixed. The task executor has to be shutdown...at
> java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_112]
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
> ~[?:1.8.0_112]
> at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> ~[?:1.8.0_112]
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
> ~[?:1.8.0_112]
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
> ~[?:1.8.0_112]
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
> ~[?:1.8.0_112]
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
> ~[?:1.8.0_112]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_112]
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
> ~[?:1.8.0_112]
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) [?:1.8.0_112]
> at io.debezium.relational.Column.editor(Column.java:31)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
> at
> io.debezium.connector.postgresql.connection.PostgresConnection.readTableColumn(PostgresConnection.java:464)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
> at
> io.debezium.jdbc.JdbcConnection.getColumnsDetails(JdbcConnection.java:1226)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
> at
> io.debezium.jdbc.JdbcConnection.readSchema(JdbcConnection.java:1182)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
> at
> io.debezium.connector.postgresql.PostgresSchema.refresh(PostgresSchema.java:100)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
> at
> io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource.connectionCreated(PostgresSnapshotChangeEventSource.java:95)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
> at
> io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:103)
> [blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
>
>
>
>
> weishishuo...@163.com
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.0 released

2022-06-08 Thread Lijie Wang
Congrats! Thanks Yang for driving the release, and thanks to all
contributors!

Best,
Lijie

John Gerassimou  于2022年6月6日周一 22:38写道:

> Thank you for all your efforts!
>
> Thanks
> John
>
> On Sun, Jun 5, 2022 at 10:33 PM Aitozi  wrote:
>
>> Thanks Yang and Nice to see it happen.
>>
>> Best,
>> Aitozi.
>>
>> Yang Wang  于2022年6月5日周日 16:14写道:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink Kubernetes Operator 1.0.0.
>>>
>>> The Flink Kubernetes Operator allows users to manage their Apache Flink
>>> applications and their lifecycle through native k8s tooling like kubectl.
>>> This is the first production ready release and brings numerous
>>> improvements and new features to almost every aspect of the operator.
>>>
>>> Please check out the release blog post for an overview of the release:
>>>
>>> https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Maven artifacts for Flink Kubernetes Operator can be found at:
>>>
>>> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
>>>
>>> Official Docker image for Flink Kubernetes Operator applications can be
>>> found at:
>>> https://hub.docker.com/r/apache/flink-kubernetes-operator
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351500
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Regards,
>>> Gyula & Yang
>>>
>>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.0 released

2022-06-08 Thread Lijie Wang
Congrats! Thanks Yang for driving the release, and thanks to all
contributors!

Best,
Lijie

John Gerassimou  于2022年6月6日周一 22:38写道:

> Thank you for all your efforts!
>
> Thanks
> John
>
> On Sun, Jun 5, 2022 at 10:33 PM Aitozi  wrote:
>
>> Thanks Yang and Nice to see it happen.
>>
>> Best,
>> Aitozi.
>>
>> Yang Wang  于2022年6月5日周日 16:14写道:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache Flink Kubernetes Operator 1.0.0.
>>>
>>> The Flink Kubernetes Operator allows users to manage their Apache Flink
>>> applications and their lifecycle through native k8s tooling like kubectl.
>>> This is the first production ready release and brings numerous
>>> improvements and new features to almost every aspect of the operator.
>>>
>>> Please check out the release blog post for an overview of the release:
>>>
>>> https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Maven artifacts for Flink Kubernetes Operator can be found at:
>>>
>>> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
>>>
>>> Official Docker image for Flink Kubernetes Operator applications can be
>>> found at:
>>> https://hub.docker.com/r/apache/flink-kubernetes-operator
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351500
>>>
>>> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>>
>>> Regards,
>>> Gyula & Yang
>>>
>>


Re: Could not find a suitable table factory for 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.

2022-06-08 Thread 顾斌杰

can refer to this:
https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto









On 6/8/2022 16:04,yuxia wrote:


Have you ever put the flink-sql-connector-hive into you FLINK_HOME/lib?
And make sure your JM/TM also contains the jar.


Best regards,
Yuxia


发件人: "顾斌杰" 
收件人: "User" 
发送时间: 星期三, 2022年 6 月 08日 下午 3:19:19
主题: Re: Could not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.





The following is part of the code :


   String createKafkaSql = "create table if not exists x" +
"(x\n" +
",update_time timestamp(3) comment '11'\n" +
",watermark for update_time as update_time - interval '20' 
second)\n" +
"with ('connector' = 'kafka'\n" +
",'topic' = '" + topic + "'\n" +
",'properties.bootstrap.servers' = '" + bootstrapServers + 
"'\n" +
",'properties.group.id' = 'flink_sql_tyc_company_info'\n" +
",'scan.startup.mode' = 'earliest-offset'\n" +
",'format' = 'json','json.fail-on-missing-field' = 
'false','json.ignore-parse-errors' = 'true')";
tEnv.executeSql(createKafkaSql);

 
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
String CreateHiveSql = "create table if not exists " +
"()\n" +
"partitioned by (op_day string comment '111')\n" +
"stored as orc\n" +

"tblproperties('partition.time-extractor.timestamp-pattern'='$op_day'\n" +
",'sink.partition-commit.trigger'='partition-time'\n" +
",'sink.partition-commit.delay'='1h'\n" +

",'sink.partition-commit.policy.kind'='metastore,success-file')";
tEnv.executeSql(CreateHiveSql);

   
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String insert = "insert into x\n" +
"select `x" +
",date_format(update_time,'-MM-dd')\n" +
"from x";
tEnv.executeSql(insert);






On 6/8/2022 15:14,顾斌杰 wrote:

Flink version: 1.13



When executed in the local environment (windows), there is no exception.

When starting the project with flink web ui, I get the following error:



Server Response:
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
application.
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
 ... 7 more

Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute 
application.
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ... 7 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Could not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. 
Reason: Required context properties mismatch. The following properties are 
requested: table.sql-dialect=hive The following factories have been considered: 
org.apache.flink.table.planner.delegation.DefaultParserFactory
at 

flink webui stdout????

2022-06-08 Thread ??????
??
flink??1.13.1
??on k8s

??flink??sql??
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
  ) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='500',
'fields.f_random.min'='1',
'fields.f_random.max'='500',
'fields.f_random_str.length'='10'
  );

  CREATE TABLE print_table (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'print'
  );

  INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;


flink web 
ui??stdout 
??ui??stdout

flink webui stdout????

2022-06-08 Thread ??????
??
??flink??sql??
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
  ) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='500',
'fields.f_random.min'='1',
'fields.f_random.max'='500',
'fields.f_random_str.length'='10'
  );

  CREATE TABLE print_table (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'print'
  );

  INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;


flink web 
ui??stdout 
??ui??stdout

Re:flink k8s ha

2022-06-08 Thread json
configmap 如下
sql-test--jobmanager-leader
sql-test-resourcemanager-leader
sql-test-restserver-leader
sql-test-dispatcher-leader



在 2022-06-08 15:42:52,"json" <18042304...@163.com> 写道:

flink1.13.6 on k8s application 模式,设置HA
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory 
high-availability.storageDir: oss
会在 k8s 上生成configmap


1. 但在 k8s 删除此任务的 deployment 后,为什么这些configmap还在?(任务都删了,这些ha应该不需要了吧)
2. 任务重新启动后,还是会去这些 configmap 读ha配置,这个逻辑也很奇怪,任务重启,为什么要去读之前HA信息

为什么会关注这个,因为碰到一个问题:
任务重启报错,找不到 /high-availability.storageDir/task/completedCheckpointe5c125ad20ea 
文件,
但oss 是有文件 
/high-availability.storageDir/task/completedCheckpointe/completedCheckpointacdfb4309903
导致我任务一直报错;删除 上面的configmap 才能正常运行





 

Re: Could not find a suitable table factory for 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.

2022-06-08 Thread yuxia
Have you ever put the flink-sql-connector-hive into you FLINK_HOME/lib? 
And make sure your JM/TM also contains the jar. 

Best regards, 
Yuxia 


发件人: "顾斌杰"  
收件人: "User"  
发送时间: 星期三, 2022年 6 月 08日 下午 3:19:19 
主题: Re: Could not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. 


The following is part of the code : 

String createKafkaSql = "create table if not exists x" + 
"(x\n" + 
",update_time timestamp(3) comment '11'\n" + 
",watermark for update_time as update_time - interval '20' second)\n" + 
"with ('connector' = 'kafka'\n" + 
",'topic' = '" + topic + "'\n" + 
",'properties.bootstrap.servers' = '" + bootstrapServers + "'\n" + 
",'properties.group.id' = 'flink_sql_tyc_company_info'\n" + 
",'scan.startup.mode' = 'earliest-offset'\n" + 
",'format' = 'json','json.fail-on-missing-field' = 
'false','json.ignore-parse-errors' = 'true')"; 
tEnv.executeSql(createKafkaSql); 


tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); 
String CreateHiveSql = "create table if not exists " + 
"()\n" + 
"partitioned by (op_day string comment '111')\n" + 
"stored as orc\n" + 
"tblproperties('partition.time-extractor.timestamp-pattern'='$op_day'\n" + 
",'sink.partition-commit.trigger'='partition-time'\n" + 
",'sink.partition-commit.delay'='1h'\n" + 
",'sink.partition-commit.policy.kind'='metastore,success-file')"; 
tEnv.executeSql(CreateHiveSql); 


tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); 
String insert = "insert into x\n" + 
"select `x" + 
",date_format(update_time,'-MM-dd')\n" + 
"from x"; 
tEnv.executeSql(insert); 
On 6/8/2022 15:14 , [ mailto:binjie...@paat.com | 顾斌杰 ] 
wrote: 





Flink version: 1.13 





When executed in the local environment (windows), there is no exception. 


When starting the project with flink web ui, I get the following error: 





Server Response: 
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
application. 
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
 
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute application. 
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
 ... 7 more 

Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute 
application. 
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
 
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
 
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ... 7 more 

Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Could not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. 
Reason: Required context properties mismatch. The following properties are 
requested: table.sql-dialect=hive The following factories have been considered: 
org.apache.flink.table.planner.delegation.DefaultParserFactory 
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
 ... 10 more 

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 

flink k8s ha

2022-06-08 Thread json
flink1.13.6 on k8s application 模式,设置HA
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory 
high-availability.storageDir: oss
会在 k8s 上生成configmap


1. 但在 k8s 删除此任务的 deployment 后,为什么这些configmap还在?(任务都删了,这些ha应该不需要了吧)
2. 任务重新启动后,还是会去这些 configmap 读ha配置,这个逻辑也很奇怪,任务重启,为什么要去读之前HA信息

为什么会关注这个,因为碰到一个问题:
任务重启报错,找不到 /high-availability.storageDir/task/completedCheckpointe5c125ad20ea 
文件,
但oss 是有文件 
/high-availability.storageDir/task/completedCheckpointe/completedCheckpointacdfb4309903
导致我任务一直报错;删除 上面的configmap 才能正常运行

Re: Could not find a suitable table factory for 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.

2022-06-08 Thread 顾斌杰



The following is part of the code :


   String createKafkaSql = "create table if not exists x" +
"(x\n" +
",update_time timestamp(3) comment '11'\n" +
",watermark for update_time as update_time - interval '20' 
second)\n" +
"with ('connector' = 'kafka'\n" +
",'topic' = '" + topic + "'\n" +
",'properties.bootstrap.servers' = '" + bootstrapServers + 
"'\n" +
",'properties.group.id' = 'flink_sql_tyc_company_info'\n" +
",'scan.startup.mode' = 'earliest-offset'\n" +
",'format' = 'json','json.fail-on-missing-field' = 
'false','json.ignore-parse-errors' = 'true')";
tEnv.executeSql(createKafkaSql);

 
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
String CreateHiveSql = "create table if not exists " +
"()\n" +
"partitioned by (op_day string comment '111')\n" +
"stored as orc\n" +

"tblproperties('partition.time-extractor.timestamp-pattern'='$op_day'\n" +
",'sink.partition-commit.trigger'='partition-time'\n" +
",'sink.partition-commit.delay'='1h'\n" +

",'sink.partition-commit.policy.kind'='metastore,success-file')";
tEnv.executeSql(CreateHiveSql);

   
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String insert = "insert into x\n" +
"select `x" +
",date_format(update_time,'-MM-dd')\n" +
"from x";
tEnv.executeSql(insert);






On 6/8/2022 15:14,顾斌杰 wrote:


Flink version: 1.13



When executed in the local environment (windows), there is no exception.

When starting the project with flink web ui, I get the following error:



Server Response:
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
application.
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
 ... 7 more

Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute 
application.
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ... 7 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Could not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. 
Reason: Required context properties mismatch. The following properties are 
requested: table.sql-dialect=hive The following factories have been considered: 
org.apache.flink.table.planner.delegation.DefaultParserFactory
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
 ... 10 more

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in 

Could not find a suitable table factory for 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.

2022-06-08 Thread 顾斌杰

Flink version: 1.13



When executed in the local environment (windows), there is no exception.

When starting the project with flink web ui, I get the following error:



Server Response:
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
application.
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
 ... 7 more

Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute 
application.
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ... 7 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Could not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. 
Reason: Required context properties mismatch. The following properties are 
requested: table.sql-dialect=hive The following factories have been considered: 
org.apache.flink.table.planner.delegation.DefaultParserFactory
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
 ... 10 more

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. 
Reason: Required context properties mismatch. The following properties are 
requested: table.sql-dialect=hive The following factories have been considered: 
org.apache.flink.table.planner.delegation.DefaultParserFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:300)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:178)
at 
org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:164)
at 
org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:121)
at 
org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:50)
at 
org.apache.flink.table.planner.delegation.PlannerBase.createNewParser(PlannerBase.scala:143)
at 
org.apache.flink.table.planner.delegation.PlannerBase.getParser(PlannerBase.scala:149)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.getParser(TableEnvironmentImpl.java:1466)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.paat.realtime.task.core.TableCrawlerTycTask.transform(TableCrawlerTycTask.java:60)
at 
com.paat.realtime.core.TableStreamApplicationContext.execute(TableStreamApplicationContext.java:72)
at 
com.paat.realtime.application.TableCrawlerTycApplication.main(TableCrawlerTycApplication.java:17)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at