Flink1.11.1版本Application Mode job on K8S集群,too old resource version问题

2020-12-20 Thread lichunguang
Flink1.11.1版本job以Application Mode在K8S集群上运行,jobmanager每个小时会重启一次,报错【Fatal error
occurred in
ResourceManager.io.fabric8.kubernetes.client.KubernetesClientException: too
old resource version】

pod重启:
 

重启原因:
2020-12-10 07:21:19,290 ERROR
org.apache.flink.kubernetes.KubernetesResourceManager[] - Fatal
error occurred in ResourceManager.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 247468999 (248117930)
  at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_202]
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_202]
  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
2020-12-10 07:21:19,291 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
error occurred in the cluster entrypoint.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 247468999 (248117930)
  at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_202]
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_202]
  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]


网上查的原因是因为:
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient类中212行

@Override
public KubernetesWatch watchPodsAndDoCallback(Map labels,
PodCallbackHandler podCallbackHandler) {
return new KubernetesWatch(
this.internalClient.pods()
.withLabels(labels)
.watch(new 
KubernetesPodsWatcher(podCallbackHandler)));
}

而ETCD中只会保留一段时间的version信息
【 I think it's standard behavior of Kubernetes to give 410 after some time
during watch. It's usually client's responsibility to handle it. In the
context of a watch, it will return HTTP_GONE when you ask to see changes for
a resourceVersion that is too old - i.e. when it can no longer tell you what
has changed since that version, since too many things have changed. In that
case, you'll need to start again, by not specifying a resourceVersion in
which case the watch will send you the current state of the thing you are
watching and then send updates from that point.】

大家有没遇到相同的问题,是怎么处理的?我有几个处理方式,希望能跟大家一起讨论一下。




--
Sent from: http://apache-flink.147419.n8.nabble.com/


a question about KubernetesConfigOptions

2020-12-20 Thread Debasish Ghosh
Hello -

In
https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
the various supported options are declared as constants.

I see that there is no support for options like Volumes and VolumeMounts.
Also I see entries for JOB_MANANGER_CPU and TASK_MANAGER_CPU but not for
JOB_MANAGER_MEMORY and TASK_MANAGER_MEMORY. How do we accommodate these if
we want to pass them as well ? I see that the class is annotated
with @PublicEvolving - just wanted to clarify if these are planned to be
added in future.

regards.
-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: flink DataStream scala api can not overloaded method aggregate

2020-12-20 Thread hiaQ
哈喽,请问你这个问题解决了吗?我也遇到了同样的问题...



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-20 Thread Yang Wang
是的,理解没有错,history-server会启动后listen一个端口

我这边尝试是没有问题的,你可以通过如下命令启动
docker run -p 8082:8082 --env
FLINK_PROPERTIES="historyserver.archive.fs.dir: file:///tmp/flink-jobs"
flink:latest history-server

更多配置你参考如下文档
https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html

Best,
Yang

liujian <13597820...@qq.com> 于2020年12月21日周一 下午1:35写道:

> 我理解的是启动一个history-server,会有一个进程,然后会暴露指定的端口,但是我好像并没有看到这样的效果,是我的理解有错吗
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danrtsey...@gmail.com;
> 发送时间:2020年12月21日(星期一) 上午10:15
> 收件人:"user-zh"
> 主题:Re: flink-shaded-hadoop-2-uber版本如何选择
>
>
>
>
> 你不需要修改CMD,entrypoint默认是docker-entrypoint.sh[1],是支持history-server的,只要传一个history-server的参数就可以了
>
> [1].
> https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
>
> Best,
> Yang
>
>
> liujian <13597820...@qq.com 于2020年12月20日周日 下午12:45写道:
>
>  Thanks,nbsp;
>  nbsp; nbsp; nbsp;但是我需要访问historyServer,那么应该需要如何操作我将flink
>  1.12.0的Dockerfile 修改成CMD ["history-server"]nbsp;
> 并暴露8082端口,但是好像达不到这个效果
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  danrtsey...@gmail.comgt;;
>  发送时间:nbsp;2020年12月19日(星期六) 晚上9:35
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
> 
> 
> 
>  你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了
>  Flink
> 
> 
> Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的
> 
> 同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的
> 
> 
>  Best,
>  Yang
> 
>  liujian <13597820...@qq.comgt; 于2020年12月19日周六 下午8:29写道:
> 
>  gt;
>  gt;
> 
> HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  "user-zh"
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  <
>  gt; danrtsey...@gmail.comamp;gt;;
>  gt; 发送时间:amp;nbsp;2020年12月16日(星期三) 晚上7:21
>  gt; 收件人:amp;nbsp;"superainbower" amp;gt;;
>  gt; 抄送:amp;nbsp;"user-zh" amp;gt;;
>  gt; 主题:amp;nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
>  gt;
>  gt;
>  gt;
>  gt;
>  gt;
> 
> 如果是在K8s上面访问hdfs,还是需要把flink-shaded-hadoop放到lib目录下,因为目前hadoop的FileSystem并不支持plugin加载
>  gt;
>  gt; Best,
>  gt; Yang
>  gt;
>  gt; superainbower  于2020年12月16日周三 下午6:19写道:
>  gt;
>  gt; amp;gt; 借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去
>  gt; amp;gt; 在2020年12月16日 10:53,Yang Wang <
> danrtsey...@gmail.comamp;gt;
>  写道:
>  gt; amp;gt;
>  gt; amp;gt; 以flink-shaded-hadoop-2-uber的2.8.3-10.0为例
>  gt; amp;gt;
>  gt; amp;gt; 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本
>  gt; amp;gt;
>  gt; amp;gt;
>  gt;
> 
> 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2],
>  gt; amp;gt; 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3
>  gt; amp;gt;
>  gt; amp;gt;
> 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt; [1]. https://github.com/apache/flink-shaded
>  ; gt; amp;gt;
> [2].
>  gt; amp;gt;
>  gt; amp;gt;
>  gt;
> 
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
> 
> ;
> gt
>  <
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationgt
> ;;
>  amp;gt
>  gt; <
> 
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationamp;gtgt
> 
> ;
> ;
>  gt; ;
>  gt; amp;gt; Best,
>  gt; amp;gt; Yang
>  gt; amp;gt;
>  gt; amp;gt; 赢峰  于2020年12月11日周五 上午8:45写道:
>  gt; amp;gt;
>  gt; amp;gt; amp;gt; flink-shaded-hadoop-2-uber版本如何选择?
>  gt; amp;gt; amp;gt;
>  gt; amp;gt; amp;gt;
>  gt; amp;gt; amp;gt; xxx-xxx 分别表示什么意思?
>  gt; amp;gt; amp;gt;
>  gt; amp;gt; amp;gt;
>  gt; amp;gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;


回复: Re: Flink-1.12支持kafka join jdbc维表吗

2020-12-20 Thread amen...@163.com
今天又博学了,谢谢!



 
发件人: Leonard Xu
发送时间: 2020-12-21 15:01
收件人: user-zh
主题: Re: Flink-1.12支持kafka join jdbc维表吗
>  
> 这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗?
 
是的
 
 
> 你说的这种方式就是好像基于处理时间的join~
是的,基于处理时间的维表join和大家熟知的lookup关联, 
语法都是一样的,因为两者语义是一样的,就是在运行时关联最新的维表数据,只是两者实现方式不同,lookup 
关联维表只是一种实现方式,实现方式是运行时每条数据都去查询数据库(语义上就是关联了最新的维表数据),关联维表也有其他的实现方式,比如把维表最新的数据维护放在state里,在运行时每条数据去和state中的数据关联。
 
祝好
Leonard
 
 
 
>   
> 
> 
> 
> 发件人: Leonard Xu
> 发送时间: 2020-12-21 14:44
> 收件人: user-zh
> 主题: Re: Flink-1.12支持kafka join jdbc维表吗
> Hi 
>> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished
> 
> 这是正常的,jdbc 
> connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。
> 
> 如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 
> temporal join changelog流 实现关联维表的准确版本。
> 
> 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates 
> FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   
> 'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
> 
> 
> 祝好,
> Leonard 
> 
 


Re: Flink-1.12支持kafka join jdbc维表吗

2020-12-20 Thread Leonard Xu
>  
> 这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗?

是的


> 你说的这种方式就是好像基于处理时间的join~
是的,基于处理时间的维表join和大家熟知的lookup关联, 
语法都是一样的,因为两者语义是一样的,就是在运行时关联最新的维表数据,只是两者实现方式不同,lookup 
关联维表只是一种实现方式,实现方式是运行时每条数据都去查询数据库(语义上就是关联了最新的维表数据),关联维表也有其他的实现方式,比如把维表最新的数据维护放在state里,在运行时每条数据去和state中的数据关联。

祝好
Leonard



>   
> 
> 
> 
> 发件人: Leonard Xu
> 发送时间: 2020-12-21 14:44
> 收件人: user-zh
> 主题: Re: Flink-1.12支持kafka join jdbc维表吗
> Hi 
>> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished
> 
> 这是正常的,jdbc 
> connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。
> 
> 如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 
> temporal join changelog流 实现关联维表的准确版本。
> 
> 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates 
> FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   
> 'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
> 
> 
> 祝好,
> Leonard 
> 



回复: Re: Flink-1.12支持kafka join jdbc维表吗

2020-12-20 Thread amen...@163.com
感谢@Leonard Xu 的回复,

这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗?

>>> 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates 
>>> FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   
>>> 'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
你说的这种方式就是好像基于处理时间的join~

best,
amenhub


 
发件人: Leonard Xu
发送时间: 2020-12-21 14:44
收件人: user-zh
主题: Re: Flink-1.12支持kafka join jdbc维表吗
Hi 
> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished
 
这是正常的,jdbc 
connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。
 
如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 
temporal join changelog流 实现关联维表的准确版本。
 
另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR 
SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   
'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
 
 
祝好,
Leonard 
 


Re: Flink-1.12支持kafka join jdbc维表吗

2020-12-20 Thread Leonard Xu
Hi 
> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished

这是正常的,jdbc 
connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。

如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 
temporal join changelog流 实现关联维表的准确版本。

另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR 
SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   
'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。


祝好,
Leonard 



flink1.10.0中的filter下推还能在flink1.12.0中使用吗?

2020-12-20 Thread jy l
Hi:
我们在flink-1.10.0中自定义了connect,并实现类了FilterableTableSource接口。目前flink-1.12.0发布了,我们想将之前解析Expression放到新版本中来使用。但是发现之前解析的方式在新版本中不能使用了。
是新版本不再支持flink-1.10.0中的Expression解析方式了吗?还是之前的依旧可以用,只是我的打开方式不对,那在1.12.0中怎么解析Expression生成Or、EqualTo、LessThan等下推的Filter?




祝好!


pyflink1.12 连接Mysql报错 : Missing required options

2020-12-20 Thread 肖越
在脚本中定义了两个源数据 ddl,但是第二就总会报缺option的问题,pyflink小白,求大神解答?
#DDL定义
source_ddl2 = """CREATE TABLE ts_pf_sec_yldrate (id DECIMAL,pf_id VARCHAR,\

symbol_id VARCHAR,biz_date VARCHAR,\

ccy_type VARCHAR,cur_id_d VARCHAR,yldrate DECIMAL,\

is_valid DECIMAL,time_mark TIMESTAMP) WITH (

'connector' = 'jdbc',

'connector.url' = 'jdbc:mysql://ip:port/db_base',

'connector.table' = 'ts_pf_sec_yldrate',

'table-name' = 'ts_pf_sec_yldrate',

'connector.driver' = 'com.mysql.jdbc.Driver',

'connector.username' = 'xxx',

'connector.password' = 'xxx')

"""
错误信息:
Traceback (most recent call last):
  File 
"C:/projects/dataService-calculate-code-python/src/test/test_mysql_connector.py",
 line 67, in 
print(join.to_pandas().head(6))
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
 line 807, in to_pandas
.collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 147, in deco
return f(*a, **kw)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
 line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
: org.apache.flink.table.api.ValidationException: Unable to create a source for 
reading table 'default_catalog.default_database.ts_pf_sec_yldrate'.


Table options are:


'connector'='jdbc'
'connector.driver'='com.mysql.jdbc.Driver'
'connector.password'='xxx'
'connector.table'='ts_pf_sec_yldrate'
'connector.url'='jdbc:mysql://ip:port/db_base'
'connector.username'='xxx'
'table-name'='ts_pf_sec_yldrate'
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
at 
org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:339)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
at 
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:62)
at 
org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:128)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at 

?????? flink-shaded-hadoop-2-uber????????????

2020-12-20 Thread liujian
??history-server,,,??,




----
??: 
   "user-zh"

https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh

Best,
Yang


liujian <13597820...@qq.com ??2020??12??20?? 12:45??

 Thanks,nbsp;
 nbsp; nbsp; 
nbsp;??historyServer,flink
 1.12.0??Dockerfile ??CMD ["history-server"]nbsp; 
??8082,??




 --nbsp;nbsp;--
 ??:

 "user-zh"

 <
 danrtsey...@gmail.comgt;;
 :nbsp;2020??12??19??(??) 9:35
 ??:nbsp;"user-zh"https://github.com/apache/flink-shaded
 gt; amp;gt; [2].
 gt; amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
 gt
 


Flink-1.12支持kafka join jdbc维表吗

2020-12-20 Thread amen...@163.com
hi,

请问kafka join jdbc维表数据而不是join jdbc的changelog,支持吗?

在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished,这样的话按理来说不管维表数据怎么变kafka都join不到维表数据了呀?

CREATE TABLE orders (
order_id STRING,
currency STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (
'connector' = 'kafka',
'topic' = 'topic_flink',
'properties.bootstrap.servers' = '10.3.12.113:9092',
'properties.group.id' = 'flink',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)

CREATE TABLE latest_rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 
'jdbc:mysql://10.3.12.113:3306/base?useUnicode=true=utf8=PRC=false'
'username' = 'root',
'password' = 'root1234',
'table-name' = 'latest_rates',
'lookup.cache.max-rows' = '1',
'lookup.cache.ttl' = '1min'
)

SELECT
o.order_id,
o.order_time,
o.amount * r.rate AS amount,
r.currency
FROM orders AS o
LEFT JOIN latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency"

best,
amenhub







Re: 请教个Flink sql问题

2020-12-20 Thread 占英华
这样是不是第一条select和第二条的select出来的结果会有差异,因为执行第一条有耗时,第二条执行时查询的结果是在耗时后查询得到的

> 在 2020年12月21日,11:14,hailongwang <18868816...@163.com> 写道:
> 
> 
> 
> 可以的,比如将结果写入table1,table2 ……
> Insert into table1 ……;
> Insert into table2 ……;
> 
> 
> 
> Best,
> Hailong
>> 在 2020-12-19 08:30:23,"占英华"  写道:
>> Flink sql的dml语句可以将结果写入不同的sink表中吗?如果可以可以怎么处理?


Re:请教个Flink sql问题

2020-12-20 Thread hailongwang


可以的,比如将结果写入table1,table2 ……
Insert into table1 ……;
Insert into table2 ……;



Best,
Hailong
在 2020-12-19 08:30:23,"占英华"  写道:
>Flink sql的dml语句可以将结果写入不同的sink表中吗?如果可以可以怎么处理?


Re:Table api 中指定rowtime的问题

2020-12-20 Thread hailongwang
Hi,
可以试下 CAST(eventTime AS TIMESTAMP)


Best,
Hailong
在 2020-12-19 11:14:53,"ゞ野蠻遊戲χ"  写道:
>大家好!
> 
>当我把DataStream流转成Table,并且指定了rowtime,然后使用带有udtf的sql传入tableEnv.sql(),抛出如下错误:Rowtime
> attributes must not be in the input rows of a regular join. As a workaround 
>you can cast the time attributes of input tables to TIMESTAMP 
>before.我也按照提示使用to_timestamp函数套在eventTime的外面,并没有起作用,请求如何解决?先是sql,eventTime是事件时间。
>
>
>sql: select
> tmp.metric_id 
>as metric_id,
> 
>tmp.metric_config as metric_config,
> startLat,
> destName,
> bizType,
> orderId,
> completedTime,
> orderStatus,
> startHexList,
> cityId,
> type,
> destLat,
> endHexList,
> destLng,
> createTime,
> passengerId,
> finishedTime,
> vehicleId,
> startLng,
> startName,
> eventTime
>from
> 
>htw_order_dwd_htw_order_geo_Infos,
> lateral table(
>  
>metricUdtf('aa')
> ) as 
>tmp(metric_id, metric_config)
>
>
>Thanks
>Jiazhi


Re: See lag end-to-end

2020-12-20 Thread Yun Gao
Hi Rex,

   I think Latency Marker is what you need [1].


Best,
 Yun


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#latency-tracking

--
Sender:Rex Fenley
Date:2020/12/21 04:57:59
Recipient:user
Cc:Brad Davis
Theme:See lag end-to-end

Hello,

Is there some proxy to seeing the relative time it takes for records to make it 
through an entire job plan? Maybe checkpoint alignment time would be a proxy 
for this? Is there metrics for that or something else that would provide signal 
here?

Thanks!

-- 

Rex Fenley | Software Engineer - Mobile and Backend

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US 


Re: yarn application模式提交任务失败

2020-12-20 Thread silence
应该是-D不是-yD



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.12

2020-12-20 Thread Yang Wang
IIUC, "state.checkpoints.dir" is specifying an external checkpoint path,
which will not be cleaned up unless
the users configured it explicitly[1].

However, for "high-availability.storageDir", it will be cleaned up
automatically when all the jobs in the application
reaches to the terminal state. Moreover, not only the checkpoints, but also
the generated job graphs, user jars/artifacts
are stored in this storage. You could check the content of this directory.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention

Best,
Yang

Boris Lublinsky  于2020年12月21日周一 上午10:18写道:

> I understand this.
> State storage Is defined defined by state.checkpointing.dir, for example
>
> state.checkpoints.dir: file:///mnt/flink/storage/checkpoints
>
>
> I am talking about reference defined in 2 places
>
>
> On Dec 20, 2020, at 8:05 PM, Yang Wang  wrote:
>
> I am afraid only the state handle is stored in the ConfigMap. The real
> state data is stored in
> the distributed storage configured via "high-availability.storageDir". I
> believe you could find
> more information in this class KubernetesStateHandleStore[1].
>
> How could you find that the checkpointing information is stored twice? It
> should not happen.
>
> [1].
> https://github.com/apache/flink/blob/5f7e0dc96547fdb2f82f903ee48bf43b47ca4ae0/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java#L53
>
>
> Best,
> Yang
>
> Boris Lublinsky  于2020年12月20日周日 上午12:49写道:
>
>> Thanks Yang,
>> This is still confusing.
>> I did more experiments and see that checkpointing information is stored
>> twice - in config map and in high-availability.storageDir
>> Do we need this duplication?
>> Do we need to specify high-availability.storageDir as defined in
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration
>> Or just specifying
>>
>> high-availability: 
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>
>> Is sufficient?
>>
>>
>>
>> On Dec 17, 2020, at 10:09 PM, Yang Wang  wrote:
>>
>> The latest successful checkpoint pointer is stored in the ConfigMap, as
>> well as the JobGraph pointer.
>> They could help us recover the running jobs before you delete the K8s
>> deployment. If the HA ConfigMaps
>> are deleted, then when you create a Flink cluster with the same
>> cluster-id, it could not recover from the latest
>> successful checkpoint automatically.
>>
>> Best,
>> Yang
>>
>>
>>
>>
>> Boris Lublinsky  于2020年12月18日周五 上午11:42写道:
>>
>>> Also re reading
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#high-availability-data-clean-up
>>>
>>> This does not seem right:
>>> To keep HA data while restarting the Flink cluster, simply delete the
>>> deployment (via kubectl delete deploy ). All the Flink
>>> cluster related resources will be deleted (e.g. JobManager Deployment,
>>> TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps
>>> will be retained because they do not set the owner reference. When
>>> restarting the cluster, all previously running jobs will be recovered and
>>> restarted from the latest successful checkpoint.
>>>
>>> Last successful checkpoint is not in the config maps, but rather on
>>> persistent volume. Config map can be safely deleted. If you restart JM, it
>>> will create a new leader anyways., So I would suggest to add owner
>>> reference there
>>>
>>>
>>> On Dec 17, 2020, at 8:49 PM, Yang Wang  wrote:
>>>
>>> Hi Boris,
>>>
>>> Thanks for your follow up response and trying the new
>>> KubernetesHAService.
>>>
>>> 1. It is a valid bug. We are not setting the service account for
>>> TaskManager pod. Before the KubernetesHAService is introduced, it works
>>> fine because the TaskManager does not need to access the K8s resource(e.g.
>>> ConfigMap) directly. I have created a ticket[1] to support setting service
>>> account for TaskManager.
>>> 2. If you directly delete the JobManager deployment, then the HA related
>>> ConfigMap will be retained. It is a by-design behavior. Because the job
>>> does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this
>>> cluster could recover in the future. If all the jobs in the application
>>> reach to the terminal state, all the HA related ConfigMaps will be cleaned
>>> up automatically. You could cancel the job and verify that. Refer here[2]
>>> for more information.
>>>
>>> For the PVC based storage, if it could support multiple read-write then
>>> the KubernetesHAService should work. Actually, it feels like a distributed
>>> storage.
>>>
>>> [1]. https://issues.apache.org/jira/browse/FLINK-20664
>>> [2].
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html
>>>
>>> Best,
>>> Yang
>>>
>>> Boris Lublinsky  于2020年12月18日周五 上午7:16写道:
>>>
 And K8 native HA works,
 But there 

Re: No execution.target specified in your configuration file

2020-12-20 Thread Ben Beasley
That worked. Thankyou, Kostas.

From: Kostas Kloudas 
Date: Sunday, December 20, 2020 at 7:21 AM
To: Ben Beasley 
Cc: user@flink.apache.org 
Subject: Re: No execution.target specified in your configuration file
Hi Ben,

You can try using StreamExecutionEnvironment
streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
instead of directly creating a new one. This will allow to pick up the
configuration parameters you pass through the command line.

I hope this helps,
Kostas

On Sun, Dec 20, 2020 at 7:46 AM Ben Beasley  wrote:
>
> I was wondering if I could get help with the issue described in this 
> stackoverflow post.


Re: 对于kafka partition 设置时间戳及watermark

2020-12-20 Thread 张锴
我现在用的flink 版本1.10.1 ,我点开 setAutoWatermarkInterval 看到private long
autoWatermarkInterval = 0;
是否代表它默认的执行频率是0,我理解的意思抽取的时间戳同时生成watermark,它们是一一对应的,不知道我的理解是否正确

赵一旦  于2020年12月20日周日 下午11:15写道:

> setAutoWatermarkInterval这个只是设置interval。决定你那个抽取ts的函数的执行频率的。
>
> r pp  于2020年12月20日周日 上午10:49写道:
>
> > 是的
> >
> > 张锴  于2020年12月19日周六 下午5:45写道:
> >
> > > 我按官网操作,重写了序列化方式
> > >
> > > val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
> > > props)kafkaSource.assignTimestampsAndWatermarks(new
> > > AscendingTimestampExtractor[MyType] {
> > > def extractAscendingTimestamp(element: MyType): Long =
> > > element.eventTimestamp})
> > > val stream: DataStream[MyType] = env.addSource(kafkaSource)
> > >
> > > *有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢?
> > >
> >
>


Re: Flink 1.12

2020-12-20 Thread Boris Lublinsky
I understand this.
State storage Is defined defined by state.checkpointing.dir, for example
state.checkpoints.dir: file:///mnt/flink/storage/checkpoints

I am talking about reference defined in 2 places


> On Dec 20, 2020, at 8:05 PM, Yang Wang  wrote:
> 
> I am afraid only the state handle is stored in the ConfigMap. The real state 
> data is stored in
> the distributed storage configured via "high-availability.storageDir". I 
> believe you could find
> more information in this class KubernetesStateHandleStore[1].
> 
> How could you find that the checkpointing information is stored twice? It 
> should not happen.
> 
> [1]. 
> https://github.com/apache/flink/blob/5f7e0dc96547fdb2f82f903ee48bf43b47ca4ae0/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java#L53
>  
> 
> 
> 
> Best,
> Yang
> 
> Boris Lublinsky  > 于2020年12月20日周日 上午12:49写道:
> Thanks Yang,
> This is still confusing.
> I did more experiments and see that checkpointing information is stored twice 
> - in config map and in high-availability.storageDir
> Do we need this duplication?
> Do we need to specify high-availability.storageDir as defined in 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration
>  
> 
> Or just specifying
> 
> high-availability: 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> Is sufficient?
>  
> 
> 
> 
>> On Dec 17, 2020, at 10:09 PM, Yang Wang > > wrote:
>> 
>> The latest successful checkpoint pointer is stored in the ConfigMap, as well 
>> as the JobGraph pointer.
>> They could help us recover the running jobs before you delete the K8s 
>> deployment. If the HA ConfigMaps
>> are deleted, then when you create a Flink cluster with the same cluster-id, 
>> it could not recover from the latest
>> successful checkpoint automatically.
>> 
>> Best,
>> Yang
>> 
>> 
>> 
>> 
>> Boris Lublinsky > > 于2020年12月18日周五 上午11:42写道:
>> Also re reading 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#high-availability-data-clean-up
>>  
>> 
>> 
>> This does not seem right:
>> To keep HA data while restarting the Flink cluster, simply delete the 
>> deployment (via kubectl delete deploy ). All the Flink cluster 
>> related resources will be deleted (e.g. JobManager Deployment, TaskManager 
>> pods, services, Flink conf ConfigMap). HA related ConfigMaps will be 
>> retained because they do not set the owner reference. When restarting the 
>> cluster, all previously running jobs will be recovered and restarted from 
>> the latest successful checkpoint.
>> 
>> Last successful checkpoint is not in the config maps, but rather on 
>> persistent volume. Config map can be safely deleted. If you restart JM, it 
>> will create a new leader anyways., So I would suggest to add owner reference 
>> there
>> 
>> 
>>> On Dec 17, 2020, at 8:49 PM, Yang Wang >> > wrote:
>>> 
>>> Hi Boris,
>>> 
>>> Thanks for your follow up response and trying the new KubernetesHAService.
>>> 
>>> 1. It is a valid bug. We are not setting the service account for 
>>> TaskManager pod. Before the KubernetesHAService is introduced, it works 
>>> fine because the TaskManager does not need to access the K8s resource(e.g. 
>>> ConfigMap) directly. I have created a ticket[1] to support setting service 
>>> account for TaskManager. 
>>> 2. If you directly delete the JobManager deployment, then the HA related 
>>> ConfigMap will be retained. It is a by-design behavior. Because the job 
>>> does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this 
>>> cluster could recover in the future. If all the jobs in the application 
>>> reach to the terminal state, all the HA related ConfigMaps will be cleaned 
>>> up automatically. You could cancel the job and verify that. Refer here[2] 
>>> for more information.
>>> 
>>> For the PVC based storage, if it could support multiple read-write then the 
>>> KubernetesHAService should work. Actually, it feels like a distributed 
>>> storage.
>>> 
>>> [1]. https://issues.apache.org/jira/browse/FLINK-20664 
>>> 
>>> [2]. 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html
>>>  
>>> 
>>> 
>>> 

Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-20 Thread Yang Wang
你不需要修改CMD,entrypoint默认是docker-entrypoint.sh[1],是支持history-server的,只要传一个history-server的参数就可以了

[1].
https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh

Best,
Yang


liujian <13597820...@qq.com> 于2020年12月20日周日 下午12:45写道:

> Thanks,
>   但是我需要访问historyServer,那么应该需要如何操作我将flink
> 1.12.0的Dockerfile 修改成CMD ["history-server"] 并暴露8082端口,但是好像达不到这个效果
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danrtsey...@gmail.com;
> 发送时间:2020年12月19日(星期六) 晚上9:35
> 收件人:"user-zh"
> 主题:Re: flink-shaded-hadoop-2-uber版本如何选择
>
>
>
> 你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了
> Flink
>
> Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的
> 同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的
>
>
> Best,
> Yang
>
> liujian <13597820...@qq.com 于2020年12月19日周六 下午8:29写道:
>
> 
> 
> HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  danrtsey...@gmail.comgt;;
>  发送时间:nbsp;2020年12月16日(星期三) 晚上7:21
>  收件人:nbsp;"superainbower"  抄送:nbsp;"user-zh"  主题:nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
> 
> 
> 
> 
> 
> 如果是在K8s上面访问hdfs,还是需要把flink-shaded-hadoop放到lib目录下,因为目前hadoop的FileSystem并不支持plugin加载
> 
>  Best,
>  Yang
> 
>  superainbower  
>  gt; 借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去
>  gt; 在2020年12月16日 10:53,Yang Wang  写道:
>  gt;
>  gt; 以flink-shaded-hadoop-2-uber的2.8.3-10.0为例
>  gt;
>  gt; 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本
>  gt;
>  gt;
> 
> 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2],
>  gt; 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3
>  gt;
>  gt; 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0
>  gt;
>  gt;
>  gt; [1]. https://github.com/apache/flink-shaded
>  gt; [2].
>  gt;
>  gt;
> 
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
> 
> ;
> gt
>  <
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationgt
> ;
>  ;
>  gt; Best,
>  gt; Yang
>  gt;
>  gt; 赢峰   gt;
>  gt; gt; flink-shaded-hadoop-2-uber版本如何选择?
>  gt; gt;
>  gt; gt;
>  gt; gt; xxx-xxx 分别表示什么意思?
>  gt; gt;
>  gt; gt;
>  gt; gt;
>  gt;
>  gt;


Re: Flink 1.12

2020-12-20 Thread Yang Wang
I am afraid only the state handle is stored in the ConfigMap. The real
state data is stored in
the distributed storage configured via "high-availability.storageDir". I
believe you could find
more information in this class KubernetesStateHandleStore[1].

How could you find that the checkpointing information is stored twice? It
should not happen.

[1].
https://github.com/apache/flink/blob/5f7e0dc96547fdb2f82f903ee48bf43b47ca4ae0/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java#L53


Best,
Yang

Boris Lublinsky  于2020年12月20日周日 上午12:49写道:

> Thanks Yang,
> This is still confusing.
> I did more experiments and see that checkpointing information is stored
> twice - in config map and in high-availability.storageDir
> Do we need this duplication?
> Do we need to specify high-availability.storageDir as defined in
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#example-configuration
> Or just specifying
>
> high-availability: 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>
> Is sufficient?
>
>
>
> On Dec 17, 2020, at 10:09 PM, Yang Wang  wrote:
>
> The latest successful checkpoint pointer is stored in the ConfigMap, as
> well as the JobGraph pointer.
> They could help us recover the running jobs before you delete the K8s
> deployment. If the HA ConfigMaps
> are deleted, then when you create a Flink cluster with the same
> cluster-id, it could not recover from the latest
> successful checkpoint automatically.
>
> Best,
> Yang
>
>
>
>
> Boris Lublinsky  于2020年12月18日周五 上午11:42写道:
>
>> Also re reading
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#high-availability-data-clean-up
>>
>> This does not seem right:
>> To keep HA data while restarting the Flink cluster, simply delete the
>> deployment (via kubectl delete deploy ). All the Flink
>> cluster related resources will be deleted (e.g. JobManager Deployment,
>> TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps
>> will be retained because they do not set the owner reference. When
>> restarting the cluster, all previously running jobs will be recovered and
>> restarted from the latest successful checkpoint.
>>
>> Last successful checkpoint is not in the config maps, but rather on
>> persistent volume. Config map can be safely deleted. If you restart JM, it
>> will create a new leader anyways., So I would suggest to add owner
>> reference there
>>
>>
>> On Dec 17, 2020, at 8:49 PM, Yang Wang  wrote:
>>
>> Hi Boris,
>>
>> Thanks for your follow up response and trying the new KubernetesHAService.
>>
>> 1. It is a valid bug. We are not setting the service account for
>> TaskManager pod. Before the KubernetesHAService is introduced, it works
>> fine because the TaskManager does not need to access the K8s resource(e.g.
>> ConfigMap) directly. I have created a ticket[1] to support setting service
>> account for TaskManager.
>> 2. If you directly delete the JobManager deployment, then the HA related
>> ConfigMap will be retained. It is a by-design behavior. Because the job
>> does not go to a terminal state(SUCCEED, FAILED, CANCELED), we need this
>> cluster could recover in the future. If all the jobs in the application
>> reach to the terminal state, all the HA related ConfigMaps will be cleaned
>> up automatically. You could cancel the job and verify that. Refer here[2]
>> for more information.
>>
>> For the PVC based storage, if it could support multiple read-write then
>> the KubernetesHAService should work. Actually, it feels like a distributed
>> storage.
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-20664
>> [2].
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html
>>
>> Best,
>> Yang
>>
>> Boris Lublinsky  于2020年12月18日周五 上午7:16写道:
>>
>>> And K8 native HA works,
>>> But there are 2 bugs in this implementation.
>>>
>>> 1. Task manager pods are running as default user account, which fails
>>> because it does not have access to config maps to get endpoint’s
>>> information. I had to add permissions to default service account to make it
>>> work. Ideally both JM and TM pods should run under the same service
>>> account.
>>> 2. When a Flink application is deleted, it clears the main config map,
>>> but not the ones used for leader election
>>>
>>>
>>> And finally it works fine with PVC based storage, as long as it is
>>> read-write many
>>>
>>>
>>> On Dec 15, 2020, at 8:40 PM, Yang Wang  wrote:
>>>
>>> Hi Boris,
>>>
>>> What is -p 10?
>>>
>>> It is same to --parallelism 10. Set the default parallelism to 10.
>>>
>>> does it require a special container build?
>>>
>>> No, the official flink docker image could be used
>>> directly. Unfortunately, we do not have the image now. And we are trying to
>>> figure out.
>>> You could follow the instructions below to have your own image.
>>>
>>>
>>> git clone 

Re: Re: 如何通过现实时间控制事件时间的窗口

2020-12-20 Thread guoliubi...@foxmail.com
因为表格样式被吃掉了,所以看不清,用图片说明下。
https://i.bmp.ovh/imgs/2020/12/78d7dee70d88ebc9.png

定义了3秒的滚动窗口
第一条消息的eventTime是9:00:01,是在系统实际时间9:00:01收到的。
第二条消息的eventTime是9:00:02,但是是在系统实际时间9:00:11分收到的。
想要达成的目标是在系统时间9:00:05时把这一窗口关闭掉进行运算,忽略迟到的第二条消息,更不必要等到第3条消息触发到下一个窗口的时间了再关闭这个窗口。
找了下用ProcessingTimeoutTrigger可以达到目的,不过不知道有没有更详细的文档说明trigger怎么用的。


guoliubi...@foxmail.com
 
发件人: 赵一旦
发送时间: 2020-12-20 23:30
收件人: user-zh
主题: Re: 如何通过现实时间控制事件时间的窗口
描述比较乱,看不懂。
 
guoliubi...@foxmail.com  于2020年12月17日周四 下午2:16写道:
 
> Hi,
> 我们现在以eventTime作为时间标准,每3秒做一次TumbleWindow,数据假设如下
>  系统时间
>  与上一条间隔
>  事件时间
>  与上一条间隔
>  9:00:01
>
>  9:00:01
>
>  9:00:11
>  10s
>  9:00:02
>  1s
>  9:00:12
>  1s
>  9:00:12
>  10s
> 从事件时间上看,第一条和第二条数据是归集到同一窗口的。
> 不过现在的处理需要在窗口有第一条数据后系统时间超时4s以上就强制关闭该窗口,即在系统时间9:00:05时关闭窗口,忽略掉第二条数据。
> 请问这种情况需要怎么生成watermark?
> 使用过
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(4L))
> 或者
>
> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(4L))
> 结果都把第一条和第二条数据归集到同一个窗口中了,
> 都没有达到预想的结果。
> 要如何设置才能在窗口中仅有一条数据而忽略第二条数据?
>
>
> guoliubi...@foxmail.com
>


Re: Re: jdbc sink无法插入数据

2020-12-20 Thread guoliubi...@foxmail.com
确实可行,多谢指点。



guoliubi...@foxmail.com
 
发件人: 赵一旦
发送时间: 2020-12-20 23:24
收件人: user-zh
主题: Re: jdbc sink无法插入数据
Hi,你这个绕太多弯路了吧。
Flink任务构建的就是一个DAG,本身一个DataStream就可以分拆出多条数据流,不需要sideoutput。
SideOutput的作用是当你需要根据“一定逻辑”输出2类不同结果时使用,你这里是相同的一份数据输出到kafka和mysql,是不需要sideoutput的。这样只会多了一个流程,影响性能。
按照你的代码,应该如下写:
 
sideStream.addSink(new FlinkKafkaProducer<>(
"ratio_value",
new RatioValueSerializationSchema(suffix),
PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
tool.get(SCHEMA_REGISTRY_URL)),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
 
sideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
 
如上,针对sideStream直接添加2个sink即可。
 
 
r pp  于2020年12月19日周六 下午12:15写道:
 
> 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
> 去掉kafka sink ,看下 写入效果。
> 再对比下 加入kafka 后的效果。
>
> 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了
>
> guoliubi...@foxmail.com  于2020年12月18日周五 下午2:01写道:
>
> > Hi,
> >
> > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
> > .process(new ProcessFunction() {
> > @Override
> > public void processElement(RatioValuevalue, Context ctx,
> > Collector out) throws Exception {
> > out.collect(value);
> > ctx.output(ratioOutputTag, value);
> > }
> > });
> > sideStream.addSink(new FlinkKafkaProducer<>(
> > "ratio_value",
> > new RatioValueSerializationSchema(suffix),
> > PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
> > tool.get(SCHEMA_REGISTRY_URL)),
> > FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
> > DataStream ratioSideStream =
> > sideStream.getSideOutput(ratioOutputTag);
> >
> ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
> > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
> > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
> > 想问下这种情况是否有什么排查手段?
> >
> >
> > guoliubi...@foxmail.com
> >
>


?????? ????????flink1.12????????????????state????

2020-12-20 Thread ??????
??




----
??: 
   "user-zh"



See lag end-to-end

2020-12-20 Thread Rex Fenley
Hello,

Is there some proxy to seeing the relative time it takes for records to
make it through an entire job plan? Maybe checkpoint alignment time would
be a proxy for this? Is there metrics for that or something else that would
provide signal here?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: 如何通过现实时间控制事件时间的窗口

2020-12-20 Thread 赵一旦
描述比较乱,看不懂。

guoliubi...@foxmail.com  于2020年12月17日周四 下午2:16写道:

> Hi,
> 我们现在以eventTime作为时间标准,每3秒做一次TumbleWindow,数据假设如下
>  系统时间
>  与上一条间隔
>  事件时间
>  与上一条间隔
>  9:00:01
>
>  9:00:01
>
>  9:00:11
>  10s
>  9:00:02
>  1s
>  9:00:12
>  1s
>  9:00:12
>  10s
> 从事件时间上看,第一条和第二条数据是归集到同一窗口的。
> 不过现在的处理需要在窗口有第一条数据后系统时间超时4s以上就强制关闭该窗口,即在系统时间9:00:05时关闭窗口,忽略掉第二条数据。
> 请问这种情况需要怎么生成watermark?
> 使用过
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(4L))
> 或者
>
> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(4L))
> 结果都把第一条和第二条数据归集到同一个窗口中了,
> 都没有达到预想的结果。
> 要如何设置才能在窗口中仅有一条数据而忽略第二条数据?
>
>
> guoliubi...@foxmail.com
>


Re: jdbc sink无法插入数据

2020-12-20 Thread 赵一旦
Hi,你这个绕太多弯路了吧。
Flink任务构建的就是一个DAG,本身一个DataStream就可以分拆出多条数据流,不需要sideoutput。
SideOutput的作用是当你需要根据“一定逻辑”输出2类不同结果时使用,你这里是相同的一份数据输出到kafka和mysql,是不需要sideoutput的。这样只会多了一个流程,影响性能。
按照你的代码,应该如下写:

sideStream.addSink(new FlinkKafkaProducer<>(
"ratio_value",
new RatioValueSerializationSchema(suffix),
PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
tool.get(SCHEMA_REGISTRY_URL)),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

sideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));

如上,针对sideStream直接添加2个sink即可。


r pp  于2020年12月19日周六 下午12:15写道:

> 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
> 去掉kafka sink ,看下 写入效果。
> 再对比下 加入kafka 后的效果。
>
> 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了
>
> guoliubi...@foxmail.com  于2020年12月18日周五 下午2:01写道:
>
> > Hi,
> >
> > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
> > .process(new ProcessFunction() {
> > @Override
> > public void processElement(RatioValuevalue, Context ctx,
> > Collector out) throws Exception {
> > out.collect(value);
> > ctx.output(ratioOutputTag, value);
> > }
> > });
> > sideStream.addSink(new FlinkKafkaProducer<>(
> > "ratio_value",
> > new RatioValueSerializationSchema(suffix),
> > PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
> > tool.get(SCHEMA_REGISTRY_URL)),
> > FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
> > DataStream ratioSideStream =
> > sideStream.getSideOutput(ratioOutputTag);
> >
> ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
> > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
> > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
> > 想问下这种情况是否有什么排查手段?
> >
> >
> > guoliubi...@foxmail.com
> >
>


Re: No execution.target specified in your configuration file

2020-12-20 Thread Kostas Kloudas
Hi Ben,

You can try using StreamExecutionEnvironment
streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
instead of directly creating a new one. This will allow to pick up the
configuration parameters you pass through the command line.

I hope this helps,
Kostas

On Sun, Dec 20, 2020 at 7:46 AM Ben Beasley  wrote:
>
> I was wondering if I could get help with the issue described in this 
> stackoverflow post.


Re: flink1.11.2检查点失败

2020-12-20 Thread 赵一旦
问下,你使用的yarn集群还是standalone集群。其次多大规模的资源。
我这边之前尝试使用RocksDB,会反压很严重。目前来看file方式性能比RocksDB高很多会。

r pp  于2020年12月19日周六 下午3:35写道:

> 我觉得补充完整的 故障信息,以及你的资源配置信息,实例代码 可以更好的让别人回答你的问题
>
> zhy  于2020年12月18日周五 下午4:07写道:
>
> >
> >
> 补充一下,状态后端选择的是rocksdb,检查点间隔为15分钟,超时时间为5分钟,感觉5分钟超时已经很大了,结果检查点线程还是会被中断,是需要继续调大超时时间吗
> >
> > zhy  于2020年12月18日周五 下午3:57写道:
> >
> > > hi、
> > >
> > >
> >
> 我这面在使用flink1.11.2做实时特征的时候,状态大小大约在30g左右的时候任务就不能继续运行了,而查看异常日志发现大量的InterruptedException,请问这种情况是集群的问题还是flink的问题,而另一个3G状态的任务依然正常运行
> > >
> >
>


Re: 对于kafka partition 设置时间戳及watermark

2020-12-20 Thread 赵一旦
setAutoWatermarkInterval这个只是设置interval。决定你那个抽取ts的函数的执行频率的。

r pp  于2020年12月20日周日 上午10:49写道:

> 是的
>
> 张锴  于2020年12月19日周六 下午5:45写道:
>
> > 我按官网操作,重写了序列化方式
> >
> > val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
> > props)kafkaSource.assignTimestampsAndWatermarks(new
> > AscendingTimestampExtractor[MyType] {
> > def extractAscendingTimestamp(element: MyType): Long =
> > element.eventTimestamp})
> > val stream: DataStream[MyType] = env.addSource(kafkaSource)
> >
> > *有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢?
> >
>


Re: 请教个Flink sql问题

2020-12-20 Thread 赵一旦
应该可以,先说下我SQL也是刚刚开始学习哈。

你写2个SQL肯定是可以达到的,如果你是希望2个SQL在同一个作业中,也是可以的。
不清楚你是啥开发SQL,比如zeppelin的话,有个runAsOne的设置。如果是代码的话,好像有个StatementSet的概念。

占英华  于2020年12月19日周六 上午9:32写道:

> Flink sql的dml语句可以将结果写入不同的sink表中吗?如果可以可以怎么处理?