Flink1.11.1版本Application Mode job on K8S集群,too old resource version问题

2020-12-20 文章 lichunguang
Flink1.11.1版本job以Application Mode在K8S集群上运行,jobmanager每个小时会重启一次,报错【Fatal error
occurred in
ResourceManager.io.fabric8.kubernetes.client.KubernetesClientException: too
old resource version】

pod重启:
 

重启原因:
2020-12-10 07:21:19,290 ERROR
org.apache.flink.kubernetes.KubernetesResourceManager[] - Fatal
error occurred in ResourceManager.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 247468999 (248117930)
  at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_202]
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_202]
  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
2020-12-10 07:21:19,291 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
error occurred in the cluster entrypoint.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource
version: 247468999 (248117930)
  at
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
[flink-dist_2.11-1.11.1.jar:1.11.1]
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_202]
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_202]
  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]


网上查的原因是因为:
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient类中212行

@Override
public KubernetesWatch watchPodsAndDoCallback(Map labels,
PodCallbackHandler podCallbackHandler) {
return new KubernetesWatch(
this.internalClient.pods()
.withLabels(labels)
.watch(new 
KubernetesPodsWatcher(podCallbackHandler)));
}

而ETCD中只会保留一段时间的version信息
【 I think it's standard behavior of Kubernetes to give 410 after some time
during watch. It's usually client's responsibility to handle it. In the
context of a watch, it will return HTTP_GONE when you ask to see changes for
a resourceVersion that is too old - i.e. when it can no longer tell you what
has changed since that version, since too many things have changed. In that
case, you'll need to start again, by not specifying a resourceVersion in
which case the watch will send you the current state of the thing you are
watching and then send updates from that point.】

大家有没遇到相同的问题,是怎么处理的?我有几个处理方式,希望能跟大家一起讨论一下。




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink DataStream scala api can not overloaded method aggregate

2020-12-20 文章 hiaQ
哈喽,请问你这个问题解决了吗?我也遇到了同样的问题...



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-20 文章 Yang Wang
是的,理解没有错,history-server会启动后listen一个端口

我这边尝试是没有问题的,你可以通过如下命令启动
docker run -p 8082:8082 --env
FLINK_PROPERTIES="historyserver.archive.fs.dir: file:///tmp/flink-jobs"
flink:latest history-server

更多配置你参考如下文档
https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html

Best,
Yang

liujian <13597820...@qq.com> 于2020年12月21日周一 下午1:35写道:

> 我理解的是启动一个history-server,会有一个进程,然后会暴露指定的端口,但是我好像并没有看到这样的效果,是我的理解有错吗
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danrtsey...@gmail.com;
> 发送时间:2020年12月21日(星期一) 上午10:15
> 收件人:"user-zh"
> 主题:Re: flink-shaded-hadoop-2-uber版本如何选择
>
>
>
>
> 你不需要修改CMD,entrypoint默认是docker-entrypoint.sh[1],是支持history-server的,只要传一个history-server的参数就可以了
>
> [1].
> https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
>
> Best,
> Yang
>
>
> liujian <13597820...@qq.com 于2020年12月20日周日 下午12:45写道:
>
>  Thanks,nbsp;
>  nbsp; nbsp; nbsp;但是我需要访问historyServer,那么应该需要如何操作我将flink
>  1.12.0的Dockerfile 修改成CMD ["history-server"]nbsp;
> 并暴露8082端口,但是好像达不到这个效果
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  danrtsey...@gmail.comgt;;
>  发送时间:nbsp;2020年12月19日(星期六) 晚上9:35
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
> 
> 
> 
>  你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了
>  Flink
> 
> 
> Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的
> 
> 同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的
> 
> 
>  Best,
>  Yang
> 
>  liujian <13597820...@qq.comgt; 于2020年12月19日周六 下午8:29写道:
> 
>  gt;
>  gt;
> 
> HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  "user-zh"
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  <
>  gt; danrtsey...@gmail.comamp;gt;;
>  gt; 发送时间:amp;nbsp;2020年12月16日(星期三) 晚上7:21
>  gt; 收件人:amp;nbsp;"superainbower" amp;gt;;
>  gt; 抄送:amp;nbsp;"user-zh" amp;gt;;
>  gt; 主题:amp;nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
>  gt;
>  gt;
>  gt;
>  gt;
>  gt;
> 
> 如果是在K8s上面访问hdfs,还是需要把flink-shaded-hadoop放到lib目录下,因为目前hadoop的FileSystem并不支持plugin加载
>  gt;
>  gt; Best,
>  gt; Yang
>  gt;
>  gt; superainbower  于2020年12月16日周三 下午6:19写道:
>  gt;
>  gt; amp;gt; 借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去
>  gt; amp;gt; 在2020年12月16日 10:53,Yang Wang <
> danrtsey...@gmail.comamp;gt;
>  写道:
>  gt; amp;gt;
>  gt; amp;gt; 以flink-shaded-hadoop-2-uber的2.8.3-10.0为例
>  gt; amp;gt;
>  gt; amp;gt; 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本
>  gt; amp;gt;
>  gt; amp;gt;
>  gt;
> 
> 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2],
>  gt; amp;gt; 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3
>  gt; amp;gt;
>  gt; amp;gt;
> 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt; [1]. https://github.com/apache/flink-shaded
>  ; gt; amp;gt;
> [2].
>  gt; amp;gt;
>  gt; amp;gt;
>  gt;
> 
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
> 
> ;
> gt
>  <
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationgt
> ;;
>  amp;gt
>  gt; <
> 
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationamp;gtgt
> 
> ;
> ;
>  gt; ;
>  gt; amp;gt; Best,
>  gt; amp;gt; Yang
>  gt; amp;gt;
>  gt; amp;gt; 赢峰  于2020年12月11日周五 上午8:45写道:
>  gt; amp;gt;
>  gt; amp;gt; amp;gt; flink-shaded-hadoop-2-uber版本如何选择?
>  gt; amp;gt; amp;gt;
>  gt; amp;gt; amp;gt;
>  gt; amp;gt; amp;gt; xxx-xxx 分别表示什么意思?
>  gt; amp;gt; amp;gt;
>  gt; amp;gt; amp;gt;
>  gt; amp;gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;


回复: Re: Flink-1.12支持kafka join jdbc维表吗

2020-12-20 文章 amen...@163.com
今天又博学了,谢谢!



 
发件人: Leonard Xu
发送时间: 2020-12-21 15:01
收件人: user-zh
主题: Re: Flink-1.12支持kafka join jdbc维表吗
>  
> 这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗?
 
是的
 
 
> 你说的这种方式就是好像基于处理时间的join~
是的,基于处理时间的维表join和大家熟知的lookup关联, 
语法都是一样的,因为两者语义是一样的,就是在运行时关联最新的维表数据,只是两者实现方式不同,lookup 
关联维表只是一种实现方式,实现方式是运行时每条数据都去查询数据库(语义上就是关联了最新的维表数据),关联维表也有其他的实现方式,比如把维表最新的数据维护放在state里,在运行时每条数据去和state中的数据关联。
 
祝好
Leonard
 
 
 
>   
> 
> 
> 
> 发件人: Leonard Xu
> 发送时间: 2020-12-21 14:44
> 收件人: user-zh
> 主题: Re: Flink-1.12支持kafka join jdbc维表吗
> Hi 
>> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished
> 
> 这是正常的,jdbc 
> connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。
> 
> 如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 
> temporal join changelog流 实现关联维表的准确版本。
> 
> 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates 
> FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   
> 'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
> 
> 
> 祝好,
> Leonard 
> 
 


Re: Flink-1.12支持kafka join jdbc维表吗

2020-12-20 文章 Leonard Xu
>  
> 这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗?

是的


> 你说的这种方式就是好像基于处理时间的join~
是的,基于处理时间的维表join和大家熟知的lookup关联, 
语法都是一样的,因为两者语义是一样的,就是在运行时关联最新的维表数据,只是两者实现方式不同,lookup 
关联维表只是一种实现方式,实现方式是运行时每条数据都去查询数据库(语义上就是关联了最新的维表数据),关联维表也有其他的实现方式,比如把维表最新的数据维护放在state里,在运行时每条数据去和state中的数据关联。

祝好
Leonard



>   
> 
> 
> 
> 发件人: Leonard Xu
> 发送时间: 2020-12-21 14:44
> 收件人: user-zh
> 主题: Re: Flink-1.12支持kafka join jdbc维表吗
> Hi 
>> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished
> 
> 这是正常的,jdbc 
> connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。
> 
> 如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 
> temporal join changelog流 实现关联维表的准确版本。
> 
> 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates 
> FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   
> 'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
> 
> 
> 祝好,
> Leonard 
> 



回复: Re: Flink-1.12支持kafka join jdbc维表吗

2020-12-20 文章 amen...@163.com
感谢@Leonard Xu 的回复,

这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗?

>>> 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates 
>>> FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   
>>> 'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
你说的这种方式就是好像基于处理时间的join~

best,
amenhub


 
发件人: Leonard Xu
发送时间: 2020-12-21 14:44
收件人: user-zh
主题: Re: Flink-1.12支持kafka join jdbc维表吗
Hi 
> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished
 
这是正常的,jdbc 
connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。
 
如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 
temporal join changelog流 实现关联维表的准确版本。
 
另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR 
SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   
'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
 
 
祝好,
Leonard 
 


Re: Flink-1.12支持kafka join jdbc维表吗

2020-12-20 文章 Leonard Xu
Hi 
> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished

这是正常的,jdbc 
connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。

如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 
temporal join changelog流 实现关联维表的准确版本。

另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR 
SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   
'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。


祝好,
Leonard 



flink1.10.0中的filter下推还能在flink1.12.0中使用吗?

2020-12-20 文章 jy l
Hi:
我们在flink-1.10.0中自定义了connect,并实现类了FilterableTableSource接口。目前flink-1.12.0发布了,我们想将之前解析Expression放到新版本中来使用。但是发现之前解析的方式在新版本中不能使用了。
是新版本不再支持flink-1.10.0中的Expression解析方式了吗?还是之前的依旧可以用,只是我的打开方式不对,那在1.12.0中怎么解析Expression生成Or、EqualTo、LessThan等下推的Filter?




祝好!


pyflink1.12 连接Mysql报错 : Missing required options

2020-12-20 文章 肖越
在脚本中定义了两个源数据 ddl,但是第二就总会报缺option的问题,pyflink小白,求大神解答?
#DDL定义
source_ddl2 = """CREATE TABLE ts_pf_sec_yldrate (id DECIMAL,pf_id VARCHAR,\

symbol_id VARCHAR,biz_date VARCHAR,\

ccy_type VARCHAR,cur_id_d VARCHAR,yldrate DECIMAL,\

is_valid DECIMAL,time_mark TIMESTAMP) WITH (

'connector' = 'jdbc',

'connector.url' = 'jdbc:mysql://ip:port/db_base',

'connector.table' = 'ts_pf_sec_yldrate',

'table-name' = 'ts_pf_sec_yldrate',

'connector.driver' = 'com.mysql.jdbc.Driver',

'connector.username' = 'xxx',

'connector.password' = 'xxx')

"""
错误信息:
Traceback (most recent call last):
  File 
"C:/projects/dataService-calculate-code-python/src/test/test_mysql_connector.py",
 line 67, in 
print(join.to_pandas().head(6))
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
 line 807, in to_pandas
.collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 147, in deco
return f(*a, **kw)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
 line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
: org.apache.flink.table.api.ValidationException: Unable to create a source for 
reading table 'default_catalog.default_database.ts_pf_sec_yldrate'.


Table options are:


'connector'='jdbc'
'connector.driver'='com.mysql.jdbc.Driver'
'connector.password'='xxx'
'connector.table'='ts_pf_sec_yldrate'
'connector.url'='jdbc:mysql://ip:port/db_base'
'connector.username'='xxx'
'table-name'='ts_pf_sec_yldrate'
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
at 
org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:339)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
at 
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:62)
at 
org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:128)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at 

?????? flink-shaded-hadoop-2-uber????????????

2020-12-20 文章 liujian
??history-server,,,??,




----
??: 
   "user-zh"

https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh

Best,
Yang


liujian <13597820...@qq.com ??2020??12??20?? 12:45??

 Thanks,nbsp;
 nbsp; nbsp; 
nbsp;??historyServer,flink
 1.12.0??Dockerfile ??CMD ["history-server"]nbsp; 
??8082,??




 --nbsp;nbsp;--
 ??:

 "user-zh"

 <
 danrtsey...@gmail.comgt;;
 :nbsp;2020??12??19??(??) 9:35
 ??:nbsp;"user-zh"https://github.com/apache/flink-shaded
 gt; amp;gt; [2].
 gt; amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
 gt
 


Flink-1.12支持kafka join jdbc维表吗

2020-12-20 文章 amen...@163.com
hi,

请问kafka join jdbc维表数据而不是join jdbc的changelog,支持吗?

在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished,这样的话按理来说不管维表数据怎么变kafka都join不到维表数据了呀?

CREATE TABLE orders (
order_id STRING,
currency STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (
'connector' = 'kafka',
'topic' = 'topic_flink',
'properties.bootstrap.servers' = '10.3.12.113:9092',
'properties.group.id' = 'flink',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)

CREATE TABLE latest_rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 
'jdbc:mysql://10.3.12.113:3306/base?useUnicode=true=utf8=PRC=false'
'username' = 'root',
'password' = 'root1234',
'table-name' = 'latest_rates',
'lookup.cache.max-rows' = '1',
'lookup.cache.ttl' = '1min'
)

SELECT
o.order_id,
o.order_time,
o.amount * r.rate AS amount,
r.currency
FROM orders AS o
LEFT JOIN latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency"

best,
amenhub







Re: 请教个Flink sql问题

2020-12-20 文章 占英华
这样是不是第一条select和第二条的select出来的结果会有差异,因为执行第一条有耗时,第二条执行时查询的结果是在耗时后查询得到的

> 在 2020年12月21日,11:14,hailongwang <18868816...@163.com> 写道:
> 
> 
> 
> 可以的,比如将结果写入table1,table2 ……
> Insert into table1 ……;
> Insert into table2 ……;
> 
> 
> 
> Best,
> Hailong
>> 在 2020-12-19 08:30:23,"占英华"  写道:
>> Flink sql的dml语句可以将结果写入不同的sink表中吗?如果可以可以怎么处理?


Re:请教个Flink sql问题

2020-12-20 文章 hailongwang


可以的,比如将结果写入table1,table2 ……
Insert into table1 ……;
Insert into table2 ……;



Best,
Hailong
在 2020-12-19 08:30:23,"占英华"  写道:
>Flink sql的dml语句可以将结果写入不同的sink表中吗?如果可以可以怎么处理?


Re:Table api 中指定rowtime的问题

2020-12-20 文章 hailongwang
Hi,
可以试下 CAST(eventTime AS TIMESTAMP)


Best,
Hailong
在 2020-12-19 11:14:53,"ゞ野蠻遊戲χ"  写道:
>大家好!
> 
>当我把DataStream流转成Table,并且指定了rowtime,然后使用带有udtf的sql传入tableEnv.sql(),抛出如下错误:Rowtime
> attributes must not be in the input rows of a regular join. As a workaround 
>you can cast the time attributes of input tables to TIMESTAMP 
>before.我也按照提示使用to_timestamp函数套在eventTime的外面,并没有起作用,请求如何解决?先是sql,eventTime是事件时间。
>
>
>sql: select
> tmp.metric_id 
>as metric_id,
> 
>tmp.metric_config as metric_config,
> startLat,
> destName,
> bizType,
> orderId,
> completedTime,
> orderStatus,
> startHexList,
> cityId,
> type,
> destLat,
> endHexList,
> destLng,
> createTime,
> passengerId,
> finishedTime,
> vehicleId,
> startLng,
> startName,
> eventTime
>from
> 
>htw_order_dwd_htw_order_geo_Infos,
> lateral table(
>  
>metricUdtf('aa')
> ) as 
>tmp(metric_id, metric_config)
>
>
>Thanks
>Jiazhi


Re: yarn application模式提交任务失败

2020-12-20 文章 silence
应该是-D不是-yD



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 对于kafka partition 设置时间戳及watermark

2020-12-20 文章 张锴
我现在用的flink 版本1.10.1 ,我点开 setAutoWatermarkInterval 看到private long
autoWatermarkInterval = 0;
是否代表它默认的执行频率是0,我理解的意思抽取的时间戳同时生成watermark,它们是一一对应的,不知道我的理解是否正确

赵一旦  于2020年12月20日周日 下午11:15写道:

> setAutoWatermarkInterval这个只是设置interval。决定你那个抽取ts的函数的执行频率的。
>
> r pp  于2020年12月20日周日 上午10:49写道:
>
> > 是的
> >
> > 张锴  于2020年12月19日周六 下午5:45写道:
> >
> > > 我按官网操作,重写了序列化方式
> > >
> > > val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
> > > props)kafkaSource.assignTimestampsAndWatermarks(new
> > > AscendingTimestampExtractor[MyType] {
> > > def extractAscendingTimestamp(element: MyType): Long =
> > > element.eventTimestamp})
> > > val stream: DataStream[MyType] = env.addSource(kafkaSource)
> > >
> > > *有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢?
> > >
> >
>


Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-20 文章 Yang Wang
你不需要修改CMD,entrypoint默认是docker-entrypoint.sh[1],是支持history-server的,只要传一个history-server的参数就可以了

[1].
https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh

Best,
Yang


liujian <13597820...@qq.com> 于2020年12月20日周日 下午12:45写道:

> Thanks,
>   但是我需要访问historyServer,那么应该需要如何操作我将flink
> 1.12.0的Dockerfile 修改成CMD ["history-server"] 并暴露8082端口,但是好像达不到这个效果
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danrtsey...@gmail.com;
> 发送时间:2020年12月19日(星期六) 晚上9:35
> 收件人:"user-zh"
> 主题:Re: flink-shaded-hadoop-2-uber版本如何选择
>
>
>
> 你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了
> Flink
>
> Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的
> 同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的
>
>
> Best,
> Yang
>
> liujian <13597820...@qq.com 于2020年12月19日周六 下午8:29写道:
>
> 
> 
> HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  danrtsey...@gmail.comgt;;
>  发送时间:nbsp;2020年12月16日(星期三) 晚上7:21
>  收件人:nbsp;"superainbower"  抄送:nbsp;"user-zh"  主题:nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
> 
> 
> 
> 
> 
> 如果是在K8s上面访问hdfs,还是需要把flink-shaded-hadoop放到lib目录下,因为目前hadoop的FileSystem并不支持plugin加载
> 
>  Best,
>  Yang
> 
>  superainbower  
>  gt; 借楼请问下,部署到K8S上怎么访问HDFS呢,目前我还是把shaded的jar打到镜像里面去
>  gt; 在2020年12月16日 10:53,Yang Wang  写道:
>  gt;
>  gt; 以flink-shaded-hadoop-2-uber的2.8.3-10.0为例
>  gt;
>  gt; 2.8.3指的hadoop的版本,10.0指定的flink-shaded[1]的版本
>  gt;
>  gt;
> 
> 社区从1.10开始不再推荐使用flink-shaded-hadoop的方式,而且通过设置HADOOP_CLASSPATH环境变量来提交[2],
>  gt; 这样可以让Flink变得hadoop free,从而同时支持hadoop2和hadoop3
>  gt;
>  gt; 如果你还坚持使用flink-shaded-hadoop,那就建议使用最新的版本就可以了2.8.3-10.0
>  gt;
>  gt;
>  gt; [1]. https://github.com/apache/flink-shaded
>  gt; [2].
>  gt;
>  gt;
> 
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparation
> 
> ;
> gt
>  <
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#preparationgt
> ;
>  ;
>  gt; Best,
>  gt; Yang
>  gt;
>  gt; 赢峰   gt;
>  gt; gt; flink-shaded-hadoop-2-uber版本如何选择?
>  gt; gt;
>  gt; gt;
>  gt; gt; xxx-xxx 分别表示什么意思?
>  gt; gt;
>  gt; gt;
>  gt; gt;
>  gt;
>  gt;


Re: Re: 如何通过现实时间控制事件时间的窗口

2020-12-20 文章 guoliubi...@foxmail.com
因为表格样式被吃掉了,所以看不清,用图片说明下。
https://i.bmp.ovh/imgs/2020/12/78d7dee70d88ebc9.png

定义了3秒的滚动窗口
第一条消息的eventTime是9:00:01,是在系统实际时间9:00:01收到的。
第二条消息的eventTime是9:00:02,但是是在系统实际时间9:00:11分收到的。
想要达成的目标是在系统时间9:00:05时把这一窗口关闭掉进行运算,忽略迟到的第二条消息,更不必要等到第3条消息触发到下一个窗口的时间了再关闭这个窗口。
找了下用ProcessingTimeoutTrigger可以达到目的,不过不知道有没有更详细的文档说明trigger怎么用的。


guoliubi...@foxmail.com
 
发件人: 赵一旦
发送时间: 2020-12-20 23:30
收件人: user-zh
主题: Re: 如何通过现实时间控制事件时间的窗口
描述比较乱,看不懂。
 
guoliubi...@foxmail.com  于2020年12月17日周四 下午2:16写道:
 
> Hi,
> 我们现在以eventTime作为时间标准,每3秒做一次TumbleWindow,数据假设如下
>  系统时间
>  与上一条间隔
>  事件时间
>  与上一条间隔
>  9:00:01
>
>  9:00:01
>
>  9:00:11
>  10s
>  9:00:02
>  1s
>  9:00:12
>  1s
>  9:00:12
>  10s
> 从事件时间上看,第一条和第二条数据是归集到同一窗口的。
> 不过现在的处理需要在窗口有第一条数据后系统时间超时4s以上就强制关闭该窗口,即在系统时间9:00:05时关闭窗口,忽略掉第二条数据。
> 请问这种情况需要怎么生成watermark?
> 使用过
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(4L))
> 或者
>
> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(4L))
> 结果都把第一条和第二条数据归集到同一个窗口中了,
> 都没有达到预想的结果。
> 要如何设置才能在窗口中仅有一条数据而忽略第二条数据?
>
>
> guoliubi...@foxmail.com
>


Re: Re: jdbc sink无法插入数据

2020-12-20 文章 guoliubi...@foxmail.com
确实可行,多谢指点。



guoliubi...@foxmail.com
 
发件人: 赵一旦
发送时间: 2020-12-20 23:24
收件人: user-zh
主题: Re: jdbc sink无法插入数据
Hi,你这个绕太多弯路了吧。
Flink任务构建的就是一个DAG,本身一个DataStream就可以分拆出多条数据流,不需要sideoutput。
SideOutput的作用是当你需要根据“一定逻辑”输出2类不同结果时使用,你这里是相同的一份数据输出到kafka和mysql,是不需要sideoutput的。这样只会多了一个流程,影响性能。
按照你的代码,应该如下写:
 
sideStream.addSink(new FlinkKafkaProducer<>(
"ratio_value",
new RatioValueSerializationSchema(suffix),
PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
tool.get(SCHEMA_REGISTRY_URL)),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
 
sideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
 
如上,针对sideStream直接添加2个sink即可。
 
 
r pp  于2020年12月19日周六 下午12:15写道:
 
> 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
> 去掉kafka sink ,看下 写入效果。
> 再对比下 加入kafka 后的效果。
>
> 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了
>
> guoliubi...@foxmail.com  于2020年12月18日周五 下午2:01写道:
>
> > Hi,
> >
> > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
> > .process(new ProcessFunction() {
> > @Override
> > public void processElement(RatioValuevalue, Context ctx,
> > Collector out) throws Exception {
> > out.collect(value);
> > ctx.output(ratioOutputTag, value);
> > }
> > });
> > sideStream.addSink(new FlinkKafkaProducer<>(
> > "ratio_value",
> > new RatioValueSerializationSchema(suffix),
> > PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
> > tool.get(SCHEMA_REGISTRY_URL)),
> > FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
> > DataStream ratioSideStream =
> > sideStream.getSideOutput(ratioOutputTag);
> >
> ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
> > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
> > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
> > 想问下这种情况是否有什么排查手段?
> >
> >
> > guoliubi...@foxmail.com
> >
>


?????? ????????flink1.12????????????????state????

2020-12-20 文章 ??????
??




----
??: 
   "user-zh"



Re: 如何通过现实时间控制事件时间的窗口

2020-12-20 文章 赵一旦
描述比较乱,看不懂。

guoliubi...@foxmail.com  于2020年12月17日周四 下午2:16写道:

> Hi,
> 我们现在以eventTime作为时间标准,每3秒做一次TumbleWindow,数据假设如下
>  系统时间
>  与上一条间隔
>  事件时间
>  与上一条间隔
>  9:00:01
>
>  9:00:01
>
>  9:00:11
>  10s
>  9:00:02
>  1s
>  9:00:12
>  1s
>  9:00:12
>  10s
> 从事件时间上看,第一条和第二条数据是归集到同一窗口的。
> 不过现在的处理需要在窗口有第一条数据后系统时间超时4s以上就强制关闭该窗口,即在系统时间9:00:05时关闭窗口,忽略掉第二条数据。
> 请问这种情况需要怎么生成watermark?
> 使用过
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(4L))
> 或者
>
> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(4L))
> 结果都把第一条和第二条数据归集到同一个窗口中了,
> 都没有达到预想的结果。
> 要如何设置才能在窗口中仅有一条数据而忽略第二条数据?
>
>
> guoliubi...@foxmail.com
>


Re: jdbc sink无法插入数据

2020-12-20 文章 赵一旦
Hi,你这个绕太多弯路了吧。
Flink任务构建的就是一个DAG,本身一个DataStream就可以分拆出多条数据流,不需要sideoutput。
SideOutput的作用是当你需要根据“一定逻辑”输出2类不同结果时使用,你这里是相同的一份数据输出到kafka和mysql,是不需要sideoutput的。这样只会多了一个流程,影响性能。
按照你的代码,应该如下写:

sideStream.addSink(new FlinkKafkaProducer<>(
"ratio_value",
new RatioValueSerializationSchema(suffix),
PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
tool.get(SCHEMA_REGISTRY_URL)),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

sideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));

如上,针对sideStream直接添加2个sink即可。


r pp  于2020年12月19日周六 下午12:15写道:

> 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
> 去掉kafka sink ,看下 写入效果。
> 再对比下 加入kafka 后的效果。
>
> 一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了
>
> guoliubi...@foxmail.com  于2020年12月18日周五 下午2:01写道:
>
> > Hi,
> >
> > 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
> > .process(new ProcessFunction() {
> > @Override
> > public void processElement(RatioValuevalue, Context ctx,
> > Collector out) throws Exception {
> > out.collect(value);
> > ctx.output(ratioOutputTag, value);
> > }
> > });
> > sideStream.addSink(new FlinkKafkaProducer<>(
> > "ratio_value",
> > new RatioValueSerializationSchema(suffix),
> > PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL),
> > tool.get(SCHEMA_REGISTRY_URL)),
> > FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
> > DataStream ratioSideStream =
> > sideStream.getSideOutput(ratioOutputTag);
> >
> ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
> > 在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
> > 用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
> > 想问下这种情况是否有什么排查手段?
> >
> >
> > guoliubi...@foxmail.com
> >
>


Re: flink1.11.2检查点失败

2020-12-20 文章 赵一旦
问下,你使用的yarn集群还是standalone集群。其次多大规模的资源。
我这边之前尝试使用RocksDB,会反压很严重。目前来看file方式性能比RocksDB高很多会。

r pp  于2020年12月19日周六 下午3:35写道:

> 我觉得补充完整的 故障信息,以及你的资源配置信息,实例代码 可以更好的让别人回答你的问题
>
> zhy  于2020年12月18日周五 下午4:07写道:
>
> >
> >
> 补充一下,状态后端选择的是rocksdb,检查点间隔为15分钟,超时时间为5分钟,感觉5分钟超时已经很大了,结果检查点线程还是会被中断,是需要继续调大超时时间吗
> >
> > zhy  于2020年12月18日周五 下午3:57写道:
> >
> > > hi、
> > >
> > >
> >
> 我这面在使用flink1.11.2做实时特征的时候,状态大小大约在30g左右的时候任务就不能继续运行了,而查看异常日志发现大量的InterruptedException,请问这种情况是集群的问题还是flink的问题,而另一个3G状态的任务依然正常运行
> > >
> >
>


Re: 对于kafka partition 设置时间戳及watermark

2020-12-20 文章 赵一旦
setAutoWatermarkInterval这个只是设置interval。决定你那个抽取ts的函数的执行频率的。

r pp  于2020年12月20日周日 上午10:49写道:

> 是的
>
> 张锴  于2020年12月19日周六 下午5:45写道:
>
> > 我按官网操作,重写了序列化方式
> >
> > val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
> > props)kafkaSource.assignTimestampsAndWatermarks(new
> > AscendingTimestampExtractor[MyType] {
> > def extractAscendingTimestamp(element: MyType): Long =
> > element.eventTimestamp})
> > val stream: DataStream[MyType] = env.addSource(kafkaSource)
> >
> > *有个疑问,这样写完之后是不是不用设置*setAutoWatermarkInterval 呢?
> >
>


Re: 请教个Flink sql问题

2020-12-20 文章 赵一旦
应该可以,先说下我SQL也是刚刚开始学习哈。

你写2个SQL肯定是可以达到的,如果你是希望2个SQL在同一个作业中,也是可以的。
不清楚你是啥开发SQL,比如zeppelin的话,有个runAsOne的设置。如果是代码的话,好像有个StatementSet的概念。

占英华  于2020年12月19日周六 上午9:32写道:

> Flink sql的dml语句可以将结果写入不同的sink表中吗?如果可以可以怎么处理?