Re: Re: flinkSQL join表的历史信息保存在哪里保存多久

2020-03-11 文章 wangl...@geekplus.com.cn

Thanks, it works.


wangl...@geekplus.com.cn
 
Sender: sunfulin
Send Time: 2020-03-12 14:19
Receiver: user-zh; wanglei2
cc: jinhai.me
Subject: Re:Re: Re: flinkSQL join表的历史信息保存在哪里保存多久


这样来用:
StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max);




在 2020-03-12 14:11:31,"wangl...@geekplus.com.cn"  写道:
>
>这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。
>StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>StreamQueryConfig qConfig = tableEnv.queryConfig();
>
>
>
>wangl...@geekplus.com.cn 
>
> 
>Sender: jinhai wang
>Send Time: 2020-03-12 13:44
>Receiver: user-zh@flink.apache.org
>Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久
>应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
> 
> 
>在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入:
> 
>
>两个从 kafka 创建的表:
>
>tableA: key  valueA
>tableB: key  valueB 
>
>用 flink sql 提交job 运行: select  tableA.key, tableA.valueA,tableB.valueB from 
> tableA join tableB on tableA.key = tableB.key;
>这两个表的历史数据在 flink 中存在哪里?存多久呢?
>
>比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
> 
>谢谢,
>王磊
>
>
>
>wangl...@geekplus.com.cn 
>
> 



 


Re:Re: Re: flinkSQL join表的历史信息保存在哪里保存多久

2020-03-11 文章 sunfulin




这样来用:
StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max);







在 2020-03-12 14:11:31,"wangl...@geekplus.com.cn"  写道:
>
>这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。
>StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>StreamQueryConfig qConfig = tableEnv.queryConfig();
>
>
>
>wangl...@geekplus.com.cn 
>
> 
>Sender: jinhai wang
>Send Time: 2020-03-12 13:44
>Receiver: user-zh@flink.apache.org
>Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久
>应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
> 
> 
>在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入:
> 
>
>两个从 kafka 创建的表:
>
>tableA: key  valueA
>tableB: key  valueB 
>
>用 flink sql 提交job 运行: select  tableA.key, tableA.valueA,tableB.valueB from 
> tableA join tableB on tableA.key = tableB.key;
>这两个表的历史数据在 flink 中存在哪里?存多久呢?
>
>比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
> 
>谢谢,
>王磊
>
>
>
>wangl...@geekplus.com.cn 
>
> 


Re: Re: flinkSQL join表的历史信息保存在哪里保存多久

2020-03-11 文章 wangl...@geekplus.com.cn

这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。
StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
StreamQueryConfig qConfig = tableEnv.queryConfig();



wangl...@geekplus.com.cn 

 
Sender: jinhai wang
Send Time: 2020-03-12 13:44
Receiver: user-zh@flink.apache.org
Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久
应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time
 
 
在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入:
 

两个从 kafka 创建的表:

tableA: key  valueA
tableB: key  valueB 

用 flink sql 提交job 运行: select  tableA.key, tableA.valueA,tableB.valueB from 
tableA join tableB on tableA.key = tableB.key;
这两个表的历史数据在 flink 中存在哪里?存多久呢?

比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
 
谢谢,
王磊



wangl...@geekplus.com.cn 

 


Re: flinkSQL join 表的历史信息保存在哪里保存多久

2020-03-11 文章 jinhai wang
应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time


在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入:


两个从 kafka 创建的表:

tableA: key  valueA
tableB: key  valueB 

用 flink sql 提交job 运行: select  tableA.key, tableA.valueA,tableB.valueB from 
tableA join tableB on tableA.key = tableB.key;
这两个表的历史数据在 flink 中存在哪里?存多久呢?

比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
 
谢谢,
王磊



wangl...@geekplus.com.cn 





flinkSQL join 表的历史信息保存在哪里保存多久

2020-03-11 文章 wangl...@geekplus.com.cn

两个从 kafka 创建的表:

tableA: key  valueA
tableB: key  valueB 

用 flink sql 提交job 运行: select  tableA.key, tableA.valueA,tableB.valueB from 
tableA join tableB on tableA.key = tableB.key;
这两个表的历史数据在 flink 中存在哪里?存多久呢?

比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗?
 
谢谢,
王磊



wangl...@geekplus.com.cn 


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 文章 wangl...@geekplus.com.cn
我试了下,是可以的。

Thanks


wangl...@geekplus.com.cn
 
Sender: Kurt Young
Send Time: 2020-03-11 19:59
Receiver: wangl...@geekplus.com.cn
cc: user-zh
Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
那有可能是可以的,你可以试试看

Best,
Kurt


On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn 
 wrote:
Hi Kurt,

如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗?
代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state 
存储并且再次提交任务可以被访问到直接用吗?

谢谢,
王磊



wangl...@geekplus.com.cn
 
Sender: Kurt Young
Send Time: 2020-03-11 12:54
Receiver: wangl...@geekplus.com.cn
cc: user-zh
Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
sql client 目前还不支持这个功能。

Best,
Kurt


On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn 
 wrote:
Hi Kurt, 
确实是可以 直接 flink  cancel -s 保存状态。
但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?

谢谢,
王磊
 
Sender: Kurt Young
Send Time: 2020-03-11 10:38
Receiver: user-zh
Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
 
Best,
Kurt
 
 
On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
> 有两个表:
> tableA: key  valueA
> tableB: key  valueB
>
> 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>
> 谢谢,
> 王磊
>


Re: Re: flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询

2020-03-11 文章 jun su
应该只能改ContinuousFileMonitoringFunction源码 , 支持多path

王智  于2020年3月4日周三 下午6:34写道:

> 我的需求是2,现在我使用的是execEnv.createInput(inputFormat()),
>
> 我先去试试 env.addSource(new InputFormatSourceFunction(..)...)。
>
> 多谢~
>
>
>
>
>
>
>
>
> 原始邮件
>
>
> 发件人:"JingsongLee"< lzljs3620...@aliyun.com.INVALID >;
>
> 发件时间:2020/3/4 17:40
>
> 收件人:"user-zh"< user-zh@flink.apache.org >;
>
> 主题: Re: flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file
> path 不兼容问题咨询
>
>
>
> Hi, 你的需求是什么?下列哪种? - 1.想用unbounded source,continuous的file
> source,监控文件夹,发送新文件,且需要支持多文件夹 - 2.只是想用bounded的input format,需要支持多文件
> 如果是1,现在仍然不支持。 如果是2,那你可以用env.addSource(new
> InputFormatSourceFunction(..)...)来支持多文件。 Best, Jingsong Lee
> --
> From:王智Send Time:2020年3月4日(星期三) 17:34 To:user-zhSubject:flink 1.8
> 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询
> 我在使用flink 1.8 自定义 FileInputFormat
> 的时候遇到了不兼容问题,初步看了源代码,不知道自己得出的结论是否正确,并想了解一下后续趋势和进展,麻烦各位大佬抽空指点一下,先行感谢~~ 问题1:
> StreamExecutionEnvironment 为什么要做这样的限制?ContinuousFileMonitoringFunction
> 的作用是什么?  相关的代码描述如下 StreamExecutionEnvironment 对 FileInputFormat 对象有特殊的处理逻辑
> if (inputFormat instanceof FileInputFormat) {   
> @SuppressWarnings("unchecked")    FileInputFormat


Re: 关于Flink 命令行参数广播的问题

2020-03-11 文章 tison
Hi Aven,

静态字段的使用可能会很 tricky,因为只有同一个 task 的代码才运行在同一个 classloader 里。我见过想用静态字段做全局 Map
存储的,那个实际上只有并行度设置为 1 的时候语义才对。

你说启动的生命周期执行一些用户代码,那其实就是 RichFunction 的 open
方法,它就是设计来做这个的。具体可以看你的实际业务,未必要搞得这么奇怪(x

Best,
tison.


aven.wu  于2020年3月12日周四 上午10:54写道:

> Hello
>
> 还有一个问题,除了在算子的Open方法中获取这个参数还有别的地方可以获取吗?或者在Gobgraph启动的生命周期里面有哪个阶段可以被调用提执行一些用户代码。
>
> 我的需求是需要通过命令行参数初始化一些静态类的属性,全局的静态类会被算子调用以执行某些通用的功能,如果我在open方法中去初始化的话是不太优雅,并且为了保证可用必须在每个算子的Open方法中都调用,对于一些非Rich的算子使用静态方法就会有问题。
>
> Best
> Aven
>
> 发件人: zhisheng
> 发送时间: 2020年3月11日 21:16
> 收件人: user-zh
> 主题: Re: 关于Flink 命令行参数广播的问题
>
> hi,aven.wu
>
> 可以使用 ParameterTool
> 获取到传入的参数,然后通过 env.getConfig().setGlobalJobParameters(parameterTool);
>
> 在算子中可以在 open 方法里面通过
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); 获取到配置
>
> aven.wu  于2020年3月11日周三 下午3:42写道:
>
> > Hi,大家好!
> > 遇到一个问题,在使用flink run
> >
> 提交任务时命令行参数也是就那些被main方法接收的那些,不知道如何在taskManager获取到。导致必须从外部系统(hdfs,文件)来读取例如ES,mysql等连接配置。Flink是否有这种广播命令行参数的能力,如果没有是否会考虑加入这个特性。
> >
> > Best
> > Aven
> >
> >
>
>


回复: 关于Flink 命令行参数广播的问题

2020-03-11 文章 aven . wu
Hello
还有一个问题,除了在算子的Open方法中获取这个参数还有别的地方可以获取吗?或者在Gobgraph启动的生命周期里面有哪个阶段可以被调用提执行一些用户代码。
我的需求是需要通过命令行参数初始化一些静态类的属性,全局的静态类会被算子调用以执行某些通用的功能,如果我在open方法中去初始化的话是不太优雅,并且为了保证可用必须在每个算子的Open方法中都调用,对于一些非Rich的算子使用静态方法就会有问题。

Best
Aven

发件人: zhisheng
发送时间: 2020年3月11日 21:16
收件人: user-zh
主题: Re: 关于Flink 命令行参数广播的问题

hi,aven.wu

可以使用 ParameterTool
获取到传入的参数,然后通过 env.getConfig().setGlobalJobParameters(parameterTool);

在算子中可以在 open 方法里面通过
getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); 获取到配置

aven.wu  于2020年3月11日周三 下午3:42写道:

> Hi,大家好!
> 遇到一个问题,在使用flink run
> 提交任务时命令行参数也是就那些被main方法接收的那些,不知道如何在taskManager获取到。导致必须从外部系统(hdfs,文件)来读取例如ES,mysql等连接配置。Flink是否有这种广播命令行参数的能力,如果没有是否会考虑加入这个特性。
>
> Best
> Aven
>
>



Re: [SURVEY] 您在使用什么数据变更同步工具(CDC)?

2020-03-11 文章 Jark Wu
Hi Benchao,

Great feedbacks!

1) 全量初始化能力:
第一版中社区有计划引入 flink sql 直连 mysql 获取 binlog 的方案,该方案可以获取全量+增量 binlog,且有一致性保证。
但是对接db 全量+ mq binlog,将会是未来的一个工作,主要的难点在于全量如何平滑切换到 增量的 mq offset 上。

2) 自动生成watermark:
这也是 roadmap 上的一个工作。

3) binlog以state的形式存储,只需全量加载,后续只接受增量:
我理解这个和 (1) 是类似的需求。Flink SQL 对接之后 binlog,也即 binlog 数据流转成了一个动态表,也即 Flink
在维护这个动态表的数据(state)。

4) Schema 变更:
这会是生产中非常有用的一个特性。但是目前还没有很清楚的方案。

5) flink作为一个数据同步工具:
这是非常有可能的。基于 Flink 直连 db binlog 的能力,我们可以非常方便地基于该能力搭建出一个同步工具(类似 canal),基于
flink sql 丰富的生态,
可以快速对接各种 connector。这也许将来会成为一个三方库。

Best,
Jark




On Wed, 11 Mar 2020 at 23:52, Benchao Li  wrote:

> Hi,
>
> 感谢Jark发起这个话题的讨论,这个功能对于Flink SQL来讲是一个非常重要的扩展。
>
> 问卷已填,再此再提几个小想法:
> 1. 希望对接binlog时可以有全量初始化的能力,这样在Flink中我们就有了一个全表的实时状态,方便其他表与之进行join。
> 2. 希望能够自动生成watermark,这样子可以尽可能的减少接入成本。因为有些场景是其他的append
> log数据可以跟实时维护的表进行join;也有些场景是两个binlog形成的动态表互相join。
> 3. 希望可以把binlog以state的形式存储在flink里,除了第一次启动需要全量加载,后续的运维都可以再此基础上只接收增量即可。
> 4. 如此之外,如果能有schema变更感知能力是最好的。(当然这个可能很难体现在SQL里面,毕竟SQL作业在启动时就已经确定了table
> 的schema)
> 5.
>
> 最后一点,感觉不太符合flink现在的定位,但是可能会有用户会这样来使用。就是直接把flink作为一个数据同步工具,消费binlog,直接同步到其他存储里面。(可能基本不需要做任何加工的那种,而且最好是能够有自动感知schema变更,同时可以变更下游的存储系统的schema)
>
> Jark Wu  于2020年3月11日周三 下午3:00写道:
>
> > Hi, 大家好,
> >
> > Flink 社区目前正在对接一些 CDC (Change Data Capture) 工具,以期在下个版本能支持读取和处理常见的 binlog
> > 数据,所以需要调研下大家目前主要使用的 CDC 工具是什么。
> >
> > 欢迎大家填下问卷调查,您的反馈对我们非常重要,谢谢!
> >
> > http://apacheflink.mikecrm.com/wDivVQ1
> >
> > 也欢迎大家在这个邮件下讨论关于 Flink 对接 CDC 的一些想法、需求、期望。
> >
> > Best,
> > Jark
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: FlinkSQL 1.10 DDL无法指定水印

2020-03-11 文章 Jark Wu
Benchao is right.

嵌套字段是无法直接访问的,需要逐级引用到。

On Thu, 12 Mar 2020 at 00:45, Benchao Li  wrote:

> Hi 周炎,
>
> 你的`date` 和 `time`都是在嵌套结构内部的字段,需要用*request.`value`.`date`*和*
> request.`value`.`time`*来使用它们。
>
> 周炎  于2020年3月11日周三 下午5:42写道:
>
> > DDL语句如下:
> > CREATE TABLE ods_usage_naga_dsp_filter (
> > request row<`value` row > varchar,reqid varchar,source varchar,`time` varchar,`filter`
> > array> >>>,
> > event_ts as to_timestamp(concat(`date`,`time`),'-MM-ddHH:mm:ss'),
> > WATERMARK FOR event_ts AS event_ts - interval '60' second
> > )WITH (
> > 'connector.type' = 'kafka',
> > 'format.fail-on-missing-field'='false',
> > 'connector.version' = 'universal',
> > 'connector.topic' = 'xxfilter',
> > 'connector.startup-mode' = 'latest-offset',
> > 'connector.properties.zookeeper.connect'='xx:2181',
> > 'connector.properties.bootstrap.servers'='xx:9092',
> > 'connector.properties.group.id'='xx_test',
> > 'update-mode' = 'append',
> > 'format.type' = 'json'
> > );
> > 启动程序报错:
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: From line 3, column 37 to line 3, column 42: Unknown
> > identifier 'date'
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> > at
> >
> >
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> > Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 3,
> > column 37 to line 3, column 42: Unknown identifier 'date'
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> > at
> >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> > at
> >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > at
> >
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
> > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
> > at
> >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
> > at
> >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
> > at
> >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
> > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
> > at
> >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
> > at
> >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
> > at
> >
> >
> org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593)
> > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237)
> > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
> > at
> >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
> > at
> >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
> > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> > at
> >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
> > at
> >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
> > at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:236)
> > at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407)
> > at org.apache.calcite.sql.SqlFunction.validateCall(SqlFunction.java:200)
> > at
> >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5304)
> > at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:116)
> > at
> >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:943)
> > at
> >
> >
> org.apache.calcite.sql.validate.SqlValidatorImpl.validatePar

Re: FlinkSQL 1.10 DDL无法指定水印

2020-03-11 文章 Benchao Li
Hi 周炎,

你的`date` 和 `time`都是在嵌套结构内部的字段,需要用*request.`value`.`date`*和*
request.`value`.`time`*来使用它们。

周炎  于2020年3月11日周三 下午5:42写道:

> DDL语句如下:
> CREATE TABLE ods_usage_naga_dsp_filter (
> request row<`value` row varchar,reqid varchar,source varchar,`time` varchar,`filter`
> array> >>>,
> event_ts as to_timestamp(concat(`date`,`time`),'-MM-ddHH:mm:ss'),
> WATERMARK FOR event_ts AS event_ts - interval '60' second
> )WITH (
> 'connector.type' = 'kafka',
> 'format.fail-on-missing-field'='false',
> 'connector.version' = 'universal',
> 'connector.topic' = 'xxfilter',
> 'connector.startup-mode' = 'latest-offset',
> 'connector.properties.zookeeper.connect'='xx:2181',
> 'connector.properties.bootstrap.servers'='xx:9092',
> 'connector.properties.group.id'='xx_test',
> 'update-mode' = 'append',
> 'format.type' = 'json'
> );
> 启动程序报错:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: From line 3, column 37 to line 3, column 42: Unknown
> identifier 'date'
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3,
> column 37 to line 3, column 42: Unknown identifier 'date'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
> at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
> at
>
> org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
> at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:236)
> at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407)
> at org.apache.calcite.sql.SqlFunction.validateCall(SqlFunction.java:200)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5304)
> at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:116)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:943)
> at
>
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:490)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConvert

Re: [SURVEY] 您在使用什么数据变更同步工具(CDC)?

2020-03-11 文章 Benchao Li
Hi,

感谢Jark发起这个话题的讨论,这个功能对于Flink SQL来讲是一个非常重要的扩展。

问卷已填,再此再提几个小想法:
1. 希望对接binlog时可以有全量初始化的能力,这样在Flink中我们就有了一个全表的实时状态,方便其他表与之进行join。
2. 希望能够自动生成watermark,这样子可以尽可能的减少接入成本。因为有些场景是其他的append
log数据可以跟实时维护的表进行join;也有些场景是两个binlog形成的动态表互相join。
3. 希望可以把binlog以state的形式存储在flink里,除了第一次启动需要全量加载,后续的运维都可以再此基础上只接收增量即可。
4. 如此之外,如果能有schema变更感知能力是最好的。(当然这个可能很难体现在SQL里面,毕竟SQL作业在启动时就已经确定了table
的schema)
5.
最后一点,感觉不太符合flink现在的定位,但是可能会有用户会这样来使用。就是直接把flink作为一个数据同步工具,消费binlog,直接同步到其他存储里面。(可能基本不需要做任何加工的那种,而且最好是能够有自动感知schema变更,同时可以变更下游的存储系统的schema)

Jark Wu  于2020年3月11日周三 下午3:00写道:

> Hi, 大家好,
>
> Flink 社区目前正在对接一些 CDC (Change Data Capture) 工具,以期在下个版本能支持读取和处理常见的 binlog
> 数据,所以需要调研下大家目前主要使用的 CDC 工具是什么。
>
> 欢迎大家填下问卷调查,您的反馈对我们非常重要,谢谢!
>
> http://apacheflink.mikecrm.com/wDivVQ1
>
> 也欢迎大家在这个邮件下讨论关于 Flink 对接 CDC 的一些想法、需求、期望。
>
> Best,
> Jark
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: scaling issue Running Flink on Kubernetes

2020-03-11 文章 Eleanore Jin
Hi Flavio,

We have implemented our own flink operator, the operator will start a flink
job cluster (the application jar is already packaged together with flink in
the docker image). I believe Google's flink operator will start a session
cluster, and user can submit the flink job via REST. Not looked into lyft
one before.

Eleanore


On Wed, Mar 11, 2020 at 2:21 AM Flavio Pompermaier 
wrote:

> Sorry I wanted to mention https://github.com/lyft/flinkk8soperator (I
> don't know which one of the 2 is better)
>
> On Wed, Mar 11, 2020 at 10:19 AM Flavio Pompermaier 
> wrote:
>
>> Have you tried to use existing operators such as
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator or
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator?
>>
>> On Wed, Mar 11, 2020 at 4:46 AM Xintong Song 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> That does't sound like a scaling issue. It's probably a data skew, that
>>> the data volume on some of the keys are significantly higher than others.
>>> I'm not familiar with this area though, and have copied Jark for you, who
>>> is one of the community experts in this area.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Wed, Mar 11, 2020 at 10:37 AM Eleanore Jin 
>>> wrote:
>>>
 _Hi Xintong,

 Thanks for the prompt reply! To answer your question:

- Which Flink version are you using?

v1.8.2

- Is this skew observed only after a scaling-up? What happens if
the parallelism is initially set to the scaled-up value?

I also tried this, it seems skew also happens even I do
 not change the parallelism, so it may not caused by scale-up/down

- Keeping the job running a while after the scale-up, does the skew
ease?

So the skew happens in such a way that: some partitions
 lags down to 0, but other partitions are still at level of 10_000, and I am
 seeing the back pressure is ok.

 Thanks a lot!
 Eleanore


 On Tue, Mar 10, 2020 at 7:03 PM Xintong Song 
 wrote:

> Hi Eleanore,
>
> I have a few more questions regarding your issue.
>
>- Which Flink version are you using?
>- Is this skew observed only after a scaling-up? What happens if
>the parallelism is initially set to the scaled-up value?
>- Keeping the job running a while after the scale-up, does the
>skew ease?
>
> I suspect the performance difference might be an outcome of some
> warming up issues. E.g., the existing TMs might have some file already
> localized, or some memory buffers already promoted to the JVM tenured 
> area,
> while the new TMs have not.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin 
> wrote:
>
>> Hi Experts,
>> I have my flink application running on Kubernetes, initially with 1
>> Job Manager, and 2 Task Managers.
>>
>> Then we have the custom operator that watches for the CRD, when the
>> CRD replicas changed, it will patch the Flink Job Manager deployment
>> parallelism and max parallelism according to the replicas from CRD
>> (parallelism can be configured via env variables for our application).
>> which causes the job manager restart. hence a new Flink job. But the
>> consumer group does not change, so it will continue from the offset
>> where it left.
>>
>> In addition, operator will also update Task Manager's deployment
>> replicas, and will adjust the pod number.
>>
>> In case of scale up, the existing task manager pods do not get
>> killed, but new task manager pods will be created.
>>
>> And we observed a skew in the partition offset consumed. e.g. some
>> partitions have huge lags and other partitions have small lags. (observed
>> from burrow)
>>
>> This is also validated by the metrics from Flink UI, showing the
>> throughput differs for slotss
>>
>> Any clue why this is the case?
>>
>> Thanks a lot!
>> Eleanore
>>
>
>>
>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 文章 zhisheng
http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-11-133919.png

我看现在还不支持 per job 模式,哎

zhisheng  于2020年3月11日周三 下午9:31写道:

> 好的,我先去 look look,感谢
>
> Kurt Young  于2020年3月11日周三 下午9:30写道:
>
>> https://github.com/ververica/flink-sql-gateway  了解一下
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 11, 2020 at 9:26 PM zhisheng  wrote:
>>
>> > hi, Kurt Young
>> >
>> > 除了使用 sql-client 可以使用纯 SQL
>> 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
>> > sql-client
>> >
>> > Kurt Young  于2020年3月11日周三 下午7:59写道:
>> >
>> > > 那有可能是可以的,你可以试试看
>> > >
>> > > Best,
>> > > Kurt
>> > >
>> > >
>> > > On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
>> > > wangl...@geekplus.com.cn> wrote:
>> > >
>> > > > Hi Kurt,
>> > > >
>> > > > 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从
>> state
>> > > > 中恢复的功能吗?
>> > > > 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
>> > > > 存储并且再次提交任务可以被访问到直接用吗?
>> > > >
>> > > > 谢谢,
>> > > > 王磊
>> > > >
>> > > > --
>> > > > wangl...@geekplus.com.cn
>> > > >
>> > > >
>> > > > *Sender:* Kurt Young 
>> > > > *Send Time:* 2020-03-11 12:54
>> > > > *Receiver:* wangl...@geekplus.com.cn
>> > > > *cc:* user-zh 
>> > > > *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
>> > > > sql client 目前还不支持这个功能。
>> > > >
>> > > > Best,
>> > > > Kurt
>> > > >
>> > > >
>> > > > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
>> > > > wangl...@geekplus.com.cn> wrote:
>> > > >
>> > > >> Hi Kurt,
>> > > >> 确实是可以 直接 flink  cancel -s 保存状态。
>> > > >> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
>> > > >>
>> > > >> 谢谢,
>> > > >> 王磊
>> > > >>
>> > > >>
>> > > >> *Sender:* Kurt Young 
>> > > >> *Send Time:* 2020-03-11 10:38
>> > > >> *Receiver:* user-zh 
>> > > >> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
>> > > >> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
>> > > >> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
>> > > >>
>> > > >> Best,
>> > > >> Kurt
>> > > >>
>> > > >>
>> > > >> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
>> > > >> wangl...@geekplus.com.cn> wrote:
>> > > >>
>> > > >> > 有两个表:
>> > > >> > tableA: key  valueA
>> > > >> > tableB: key  valueB
>> > > >> >
>> > > >> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到
>> > valueA
>> > > >> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
>> > > >> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>> > > >> >
>> > > >> > 谢谢,
>> > > >> > 王磊
>> > > >> >
>> > > >>
>> > > >>
>> > >
>> >
>>
>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 文章 zhisheng
好的,我先去 look look,感谢

Kurt Young  于2020年3月11日周三 下午9:30写道:

> https://github.com/ververica/flink-sql-gateway  了解一下
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 9:26 PM zhisheng  wrote:
>
> > hi, Kurt Young
> >
> > 除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
> > sql-client
> >
> > Kurt Young  于2020年3月11日周三 下午7:59写道:
> >
> > > 那有可能是可以的,你可以试试看
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
> > > wangl...@geekplus.com.cn> wrote:
> > >
> > > > Hi Kurt,
> > > >
> > > > 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state
> > > > 中恢复的功能吗?
> > > > 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
> > > > 存储并且再次提交任务可以被访问到直接用吗?
> > > >
> > > > 谢谢,
> > > > 王磊
> > > >
> > > > --
> > > > wangl...@geekplus.com.cn
> > > >
> > > >
> > > > *Sender:* Kurt Young 
> > > > *Send Time:* 2020-03-11 12:54
> > > > *Receiver:* wangl...@geekplus.com.cn
> > > > *cc:* user-zh 
> > > > *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> > > > sql client 目前还不支持这个功能。
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
> > > > wangl...@geekplus.com.cn> wrote:
> > > >
> > > >> Hi Kurt,
> > > >> 确实是可以 直接 flink  cancel -s 保存状态。
> > > >> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
> > > >>
> > > >> 谢谢,
> > > >> 王磊
> > > >>
> > > >>
> > > >> *Sender:* Kurt Young 
> > > >> *Send Time:* 2020-03-11 10:38
> > > >> *Receiver:* user-zh 
> > > >> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> > > >> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
> > > >> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
> > > >>
> > > >> Best,
> > > >> Kurt
> > > >>
> > > >>
> > > >> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
> > > >> wangl...@geekplus.com.cn> wrote:
> > > >>
> > > >> > 有两个表:
> > > >> > tableA: key  valueA
> > > >> > tableB: key  valueB
> > > >> >
> > > >> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到
> > valueA
> > > >> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> > > >> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
> > > >> >
> > > >> > 谢谢,
> > > >> > 王磊
> > > >> >
> > > >>
> > > >>
> > >
> >
>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 文章 Kurt Young
https://github.com/ververica/flink-sql-gateway  了解一下

Best,
Kurt


On Wed, Mar 11, 2020 at 9:26 PM zhisheng  wrote:

> hi, Kurt Young
>
> 除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
> sql-client
>
> Kurt Young  于2020年3月11日周三 下午7:59写道:
>
> > 那有可能是可以的,你可以试试看
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
> > wangl...@geekplus.com.cn> wrote:
> >
> > > Hi Kurt,
> > >
> > > 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state
> > > 中恢复的功能吗?
> > > 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
> > > 存储并且再次提交任务可以被访问到直接用吗?
> > >
> > > 谢谢,
> > > 王磊
> > >
> > > --
> > > wangl...@geekplus.com.cn
> > >
> > >
> > > *Sender:* Kurt Young 
> > > *Send Time:* 2020-03-11 12:54
> > > *Receiver:* wangl...@geekplus.com.cn
> > > *cc:* user-zh 
> > > *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> > > sql client 目前还不支持这个功能。
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
> > > wangl...@geekplus.com.cn> wrote:
> > >
> > >> Hi Kurt,
> > >> 确实是可以 直接 flink  cancel -s 保存状态。
> > >> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
> > >>
> > >> 谢谢,
> > >> 王磊
> > >>
> > >>
> > >> *Sender:* Kurt Young 
> > >> *Send Time:* 2020-03-11 10:38
> > >> *Receiver:* user-zh 
> > >> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> > >> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
> > >> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
> > >>
> > >> Best,
> > >> Kurt
> > >>
> > >>
> > >> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
> > >> wangl...@geekplus.com.cn> wrote:
> > >>
> > >> > 有两个表:
> > >> > tableA: key  valueA
> > >> > tableB: key  valueB
> > >> >
> > >> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到
> valueA
> > >> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> > >> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
> > >> >
> > >> > 谢谢,
> > >> > 王磊
> > >> >
> > >>
> > >>
> >
>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 文章 zhisheng
hi, Kurt Young

除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
sql-client

Kurt Young  于2020年3月11日周三 下午7:59写道:

> 那有可能是可以的,你可以试试看
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
> > Hi Kurt,
> >
> > 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state
> > 中恢复的功能吗?
> > 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
> > 存储并且再次提交任务可以被访问到直接用吗?
> >
> > 谢谢,
> > 王磊
> >
> > --
> > wangl...@geekplus.com.cn
> >
> >
> > *Sender:* Kurt Young 
> > *Send Time:* 2020-03-11 12:54
> > *Receiver:* wangl...@geekplus.com.cn
> > *cc:* user-zh 
> > *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> > sql client 目前还不支持这个功能。
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
> > wangl...@geekplus.com.cn> wrote:
> >
> >> Hi Kurt,
> >> 确实是可以 直接 flink  cancel -s 保存状态。
> >> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
> >>
> >> 谢谢,
> >> 王磊
> >>
> >>
> >> *Sender:* Kurt Young 
> >> *Send Time:* 2020-03-11 10:38
> >> *Receiver:* user-zh 
> >> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> >> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
> >> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
> >>
> >> Best,
> >> Kurt
> >>
> >>
> >> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
> >> wangl...@geekplus.com.cn> wrote:
> >>
> >> > 有两个表:
> >> > tableA: key  valueA
> >> > tableB: key  valueB
> >> >
> >> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> >> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> >> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
> >> >
> >> > 谢谢,
> >> > 王磊
> >> >
> >>
> >>
>


Re: 关于Flink 命令行参数广播的问题

2020-03-11 文章 zhisheng
hi,aven.wu

可以使用 ParameterTool
获取到传入的参数,然后通过 env.getConfig().setGlobalJobParameters(parameterTool);

在算子中可以在 open 方法里面通过
getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); 获取到配置

aven.wu  于2020年3月11日周三 下午3:42写道:

> Hi,大家好!
> 遇到一个问题,在使用flink run
> 提交任务时命令行参数也是就那些被main方法接收的那些,不知道如何在taskManager获取到。导致必须从外部系统(hdfs,文件)来读取例如ES,mysql等连接配置。Flink是否有这种广播命令行参数的能力,如果没有是否会考虑加入这个特性。
>
> Best
> Aven
>
>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 文章 Kurt Young
那有可能是可以的,你可以试试看

Best,
Kurt


On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Hi Kurt,
>
> 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state
> 中恢复的功能吗?
> 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
> 存储并且再次提交任务可以被访问到直接用吗?
>
> 谢谢,
> 王磊
>
> --
> wangl...@geekplus.com.cn
>
>
> *Sender:* Kurt Young 
> *Send Time:* 2020-03-11 12:54
> *Receiver:* wangl...@geekplus.com.cn
> *cc:* user-zh 
> *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> sql client 目前还不支持这个功能。
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>> Hi Kurt,
>> 确实是可以 直接 flink  cancel -s 保存状态。
>> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
>>
>> 谢谢,
>> 王磊
>>
>>
>> *Sender:* Kurt Young 
>> *Send Time:* 2020-03-11 10:38
>> *Receiver:* user-zh 
>> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
>> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
>> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
>> wangl...@geekplus.com.cn> wrote:
>>
>> > 有两个表:
>> > tableA: key  valueA
>> > tableB: key  valueB
>> >
>> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
>> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
>> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>> >
>> > 谢谢,
>> > 王磊
>> >
>>
>>


Re: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 文章 wangl...@geekplus.com.cn

Thanks Jark,

No word to express my '囧'.


wangl...@geekplus.com.cn 

Sender: Jark Wu
Send Time: 2020-03-11 18:32
Receiver: wangl...@geekplus.com.cn
cc: user; user-zh
Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Hi Lei, 

The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong 
field name in the DDL. 
It should be "input_date", not "intput_date".

Best,
Jark

On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn 
 wrote:

Sorry i sent the Chinese written email to user@ 
Let me translate it to English.

I  create a table using sql-client from kafka topic:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
Then I send message to the topic: 
{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
But the input_date is not recognized on the sql-client and is null, even i  
tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000" 

How should the timestamp(3) look like in the json message?

Thanks,
Lei
 




wangl...@geekplus.com.cn
 
发件人: wangl...@geekplus.com.cn
发送时间: 2020-03-11 17:41
收件人: user
主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 文章 wangl...@geekplus.com.cn
Hi Kurt,

如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗?
代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state 
存储并且再次提交任务可以被访问到直接用吗?

谢谢,
王磊



wangl...@geekplus.com.cn
 
Sender: Kurt Young
Send Time: 2020-03-11 12:54
Receiver: wangl...@geekplus.com.cn
cc: user-zh
Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
sql client 目前还不支持这个功能。

Best,
Kurt


On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn 
 wrote:
Hi Kurt, 
确实是可以 直接 flink  cancel -s 保存状态。
但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?

谢谢,
王磊
 
Sender: Kurt Young
Send Time: 2020-03-11 10:38
Receiver: user-zh
Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
 
Best,
Kurt
 
 
On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:
 
> 有两个表:
> tableA: key  valueA
> tableB: key  valueB
>
> 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>
> 谢谢,
> 王磊
>


Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 文章 Jark Wu
Hi Lei,

The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong
field name in the DDL.
It should be "input_date", not "intput_date".

Best,
Jark

On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> Sorry i sent the Chinese written email to user@
> Let me translate it to English.
>
> I  create a table using sql-client from kafka topic:
>
> CREATE TABLE order_status (
>   out_order_code VARCHAR,
>   intput_date TIMESTAMP(3),
>   owner_code VARCHAR,
>   status INT
>   ) WITH (
>   'connector.type' = 'kafka',
>
> .
>   'format.type' = 'json',
>
>   'format.derive-schema' = 'true'
>
> )
>
> Then I send message to the topic:
> {"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
> But the input_date is not recognized on the sql-client and is null, even i
>  tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000"
>
> How should the timestamp(3) look like in the json message?
>
> Thanks,
> Lei
>
>
> --
>
> wangl...@geekplus.com.cn
>
>
> *发件人:* wangl...@geekplus.com.cn
> *发送时间:* 2020-03-11 17:41
> *收件人:* user 
> *主题:* json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
>
> 用 sql-client create 了一个 kafka table:
>
> CREATE TABLE order_status (
>   out_order_code VARCHAR,
>   intput_date TIMESTAMP(3),
>   owner_code VARCHAR,
>   status INT
>   ) WITH (
>   'connector.type' = 'kafka',
>
> .
>   'format.type' = 'json',
>
>   'format.derive-schema' = 'true'
>
> )
>
> 然后往 kafka 这个 topic
> 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
> input_date 在 sql-clinet 端始终是 NULL.
> 我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11
> 13:00:00.000" 也都不行。
> 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?
>
> 谢谢,
> 王磊
>
> --
> wangl...@geekplus.com.cn
>
>


回复: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

2020-03-11 文章 wangl...@geekplus.com.cn

Sorry i sent the Chinese written email to user@ 
Let me translate it to English.

I  create a table using sql-client from kafka topic:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
Then I send message to the topic: 
{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
But the input_date is not recognized on the sql-client and is null, even i  
tried   1583828700240  "2020-03-11 13:00:00"  "2020-03-11 13:00:00.000" 

How should the timestamp(3) look like in the json message?

Thanks,
Lei
 




wangl...@geekplus.com.cn
 
发件人: wangl...@geekplus.com.cn
发送时间: 2020-03-11 17:41
收件人: user
主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别

用 sql-client create 了一个 kafka table:
CREATE TABLE order_status (
  out_order_code VARCHAR,
  intput_date TIMESTAMP(3),
  owner_code VARCHAR,
  status INT
  ) WITH (
  'connector.type' = 'kafka',.
  'format.type' = 'json',  'format.derive-schema' = 'true'   )
然后往 kafka 这个 topic 
发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90}
input_date 在 sql-clinet 端始终是 NULL. 
我把 发送的 input_date 改成 1583828700240  "2020-03-11 13:00:00"  "2020-03-11 
13:00:00.000" 也都不行。 
这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢?

谢谢,
王磊



wangl...@geekplus.com.cn


FlinkSQL 1.10 DDL无法指定水印

2020-03-11 文章 周炎
DDL语句如下:
CREATE TABLE ods_usage_naga_dsp_filter (
request row<`value` row> >>>,
event_ts as to_timestamp(concat(`date`,`time`),'-MM-ddHH:mm:ss'),
WATERMARK FOR event_ts AS event_ts - interval '60' second
)WITH (
'connector.type' = 'kafka',
'format.fail-on-missing-field'='false',
'connector.version' = 'universal',
'connector.topic' = 'xxfilter',
'connector.startup-mode' = 'latest-offset',
'connector.properties.zookeeper.connect'='xx:2181',
'connector.properties.bootstrap.servers'='xx:9092',
'connector.properties.group.id'='xx_test',
'update-mode' = 'append',
'format.type' = 'json'
);
启动程序报错:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: From line 3, column 37 to line 3, column 42: Unknown
identifier 'date'
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3,
column 37 to line 3, column 42: Unknown identifier 'date'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at
org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:236)
at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407)
at org.apache.calcite.sql.SqlFunction.validateCall(SqlFunction.java:200)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5304)
at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:116)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:943)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:490)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at com.cootek.streaming.flink.SQLSubmit$.callCreateTable(SQLSubmit.scala:31)
at com.cootek.streaming.flink.SQLSubmit$.callCommand(SQLSubmit.scala:15)
at
com.cootek.streaming.FlinkBootstrap$$anonfun$1.app

关于Flink 命令行参数广播的问题

2020-03-11 文章 aven . wu
Hi,大家好!
遇到一个问题,在使用flink run 
提交任务时命令行参数也是就那些被main方法接收的那些,不知道如何在taskManager获取到。导致必须从外部系统(hdfs,文件)来读取例如ES,mysql等连接配置。Flink是否有这种广播命令行参数的能力,如果没有是否会考虑加入这个特性。

Best
Aven



Flink有类似storm主动fail的机制吗?

2020-03-11 文章 Sun.Zhu
Hi,
Flink有类似storm主动fail的机制吗?
没有的话,有什么好的实现方案吗?比如用状态存储失败的记录?






感谢您的回复
| |
Sun.Zhu
|
|
邮箱:17626017...@163.com
|

Signature is customized by Netease Mail Master

[SURVEY] 您在使用什么数据变更同步工具(CDC)?

2020-03-11 文章 Jark Wu
Hi, 大家好,

Flink 社区目前正在对接一些 CDC (Change Data Capture) 工具,以期在下个版本能支持读取和处理常见的 binlog
数据,所以需要调研下大家目前主要使用的 CDC 工具是什么。

欢迎大家填下问卷调查,您的反馈对我们非常重要,谢谢!

http://apacheflink.mikecrm.com/wDivVQ1

也欢迎大家在这个邮件下讨论关于 Flink 对接 CDC 的一些想法、需求、期望。

Best,
Jark