Re: Re: flinkSQL join表的历史信息保存在哪里保存多久
Thanks, it works. wangl...@geekplus.com.cn Sender: sunfulin Send Time: 2020-03-12 14:19 Receiver: user-zh; wanglei2 cc: jinhai.me Subject: Re:Re: Re: flinkSQL join表的历史信息保存在哪里保存多久 这样来用: StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max); 在 2020-03-12 14:11:31,"wangl...@geekplus.com.cn" 写道: > >这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。 >StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法 >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); >StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); >StreamQueryConfig qConfig = tableEnv.queryConfig(); > > > >wangl...@geekplus.com.cn > > >Sender: jinhai wang >Send Time: 2020-03-12 13:44 >Receiver: user-zh@flink.apache.org >Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久 >应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time > > >在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入: > > >两个从 kafka 创建的表: > >tableA: key valueA >tableB: key valueB > >用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from > tableA join tableB on tableA.key = tableB.key; >这两个表的历史数据在 flink 中存在哪里?存多久呢? > >比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗? > >谢谢, >王磊 > > > >wangl...@geekplus.com.cn > >
Re:Re: Re: flinkSQL join表的历史信息保存在哪里保存多久
这样来用: StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max); 在 2020-03-12 14:11:31,"wangl...@geekplus.com.cn" 写道: > >这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。 >StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法 >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); >StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); >StreamQueryConfig qConfig = tableEnv.queryConfig(); > > > >wangl...@geekplus.com.cn > > >Sender: jinhai wang >Send Time: 2020-03-12 13:44 >Receiver: user-zh@flink.apache.org >Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久 >应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time > > >在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入: > > >两个从 kafka 创建的表: > >tableA: key valueA >tableB: key valueB > >用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from > tableA join tableB on tableA.key = tableB.key; >这两个表的历史数据在 flink 中存在哪里?存多久呢? > >比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗? > >谢谢, >王磊 > > > >wangl...@geekplus.com.cn > >
Re: Re: flinkSQL join表的历史信息保存在哪里保存多久
这个文档是最新的吗,我直接在 IDEA 里面写这三行代码。 StreamQueryConfig Deprecated, tableEnv 没有 queryConfig() 方法 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); StreamQueryConfig qConfig = tableEnv.queryConfig(); wangl...@geekplus.com.cn Sender: jinhai wang Send Time: 2020-03-12 13:44 Receiver: user-zh@flink.apache.org Subject: Re: flinkSQL join表的历史信息保存在哪里保存多久 应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time 在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入: 两个从 kafka 创建的表: tableA: key valueA tableB: key valueB 用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from tableA join tableB on tableA.key = tableB.key; 这两个表的历史数据在 flink 中存在哪里?存多久呢? 比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: flinkSQL join 表的历史信息保存在哪里保存多久
应该是withIdleStateRetentionTime参数的配置时间。具体文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html#idle-state-retention-time 在 2020/3/12 下午12:37,“wangl...@geekplus.com.cn” 写入: 两个从 kafka 创建的表: tableA: key valueA tableB: key valueB 用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from tableA join tableB on tableA.key = tableB.key; 这两个表的历史数据在 flink 中存在哪里?存多久呢? 比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗? 谢谢, 王磊 wangl...@geekplus.com.cn
flinkSQL join 表的历史信息保存在哪里保存多久
两个从 kafka 创建的表: tableA: key valueA tableB: key valueB 用 flink sql 提交job 运行: select tableA.key, tableA.valueA,tableB.valueB from tableA join tableB on tableA.key = tableB.key; 这两个表的历史数据在 flink 中存在哪里?存多久呢? 比如我 tableA key1 先出现,很长时间以后(一个月)tableB key1 才出现,这时候还能 join 上吗? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
我试了下,是可以的。 Thanks wangl...@geekplus.com.cn Sender: Kurt Young Send Time: 2020-03-11 19:59 Receiver: wangl...@geekplus.com.cn cc: user-zh Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? 那有可能是可以的,你可以试试看 Best, Kurt On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn wrote: Hi Kurt, 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗? 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state 存储并且再次提交任务可以被访问到直接用吗? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Kurt Young Send Time: 2020-03-11 12:54 Receiver: wangl...@geekplus.com.cn cc: user-zh Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? sql client 目前还不支持这个功能。 Best, Kurt On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn wrote: Hi Kurt, 确实是可以 直接 flink cancel -s 保存状态。 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? 谢谢, 王磊 Sender: Kurt Young Send Time: 2020-03-11 10:38 Receiver: user-zh Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 Best, Kurt On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > 有两个表: > tableA: key valueA > tableB: key valueB > > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? > > 谢谢, > 王磊 >
Re: Re: flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询
应该只能改ContinuousFileMonitoringFunction源码 , 支持多path 王智 于2020年3月4日周三 下午6:34写道: > 我的需求是2,现在我使用的是execEnv.createInput(inputFormat()), > > 我先去试试 env.addSource(new InputFormatSourceFunction(..)...)。 > > 多谢~ > > > > > > > > > 原始邮件 > > > 发件人:"JingsongLee"< lzljs3620...@aliyun.com.INVALID >; > > 发件时间:2020/3/4 17:40 > > 收件人:"user-zh"< user-zh@flink.apache.org >; > > 主题: Re: flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file > path 不兼容问题咨询 > > > > Hi, 你的需求是什么?下列哪种? - 1.想用unbounded source,continuous的file > source,监控文件夹,发送新文件,且需要支持多文件夹 - 2.只是想用bounded的input format,需要支持多文件 > 如果是1,现在仍然不支持。 如果是2,那你可以用env.addSource(new > InputFormatSourceFunction(..)...)来支持多文件。 Best, Jingsong Lee > -- > From:王智Send Time:2020年3月4日(星期三) 17:34 To:user-zhSubject:flink 1.8 > 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询 > 我在使用flink 1.8 自定义 FileInputFormat > 的时候遇到了不兼容问题,初步看了源代码,不知道自己得出的结论是否正确,并想了解一下后续趋势和进展,麻烦各位大佬抽空指点一下,先行感谢~~ 问题1: > StreamExecutionEnvironment 为什么要做这样的限制?ContinuousFileMonitoringFunction > 的作用是什么? 相关的代码描述如下 StreamExecutionEnvironment 对 FileInputFormat 对象有特殊的处理逻辑 > if (inputFormat instanceof FileInputFormat) { > @SuppressWarnings("unchecked") FileInputFormat
Re: 关于Flink 命令行参数广播的问题
Hi Aven, 静态字段的使用可能会很 tricky,因为只有同一个 task 的代码才运行在同一个 classloader 里。我见过想用静态字段做全局 Map 存储的,那个实际上只有并行度设置为 1 的时候语义才对。 你说启动的生命周期执行一些用户代码,那其实就是 RichFunction 的 open 方法,它就是设计来做这个的。具体可以看你的实际业务,未必要搞得这么奇怪(x Best, tison. aven.wu 于2020年3月12日周四 上午10:54写道: > Hello > > 还有一个问题,除了在算子的Open方法中获取这个参数还有别的地方可以获取吗?或者在Gobgraph启动的生命周期里面有哪个阶段可以被调用提执行一些用户代码。 > > 我的需求是需要通过命令行参数初始化一些静态类的属性,全局的静态类会被算子调用以执行某些通用的功能,如果我在open方法中去初始化的话是不太优雅,并且为了保证可用必须在每个算子的Open方法中都调用,对于一些非Rich的算子使用静态方法就会有问题。 > > Best > Aven > > 发件人: zhisheng > 发送时间: 2020年3月11日 21:16 > 收件人: user-zh > 主题: Re: 关于Flink 命令行参数广播的问题 > > hi,aven.wu > > 可以使用 ParameterTool > 获取到传入的参数,然后通过 env.getConfig().setGlobalJobParameters(parameterTool); > > 在算子中可以在 open 方法里面通过 > getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); 获取到配置 > > aven.wu 于2020年3月11日周三 下午3:42写道: > > > Hi,大家好! > > 遇到一个问题,在使用flink run > > > 提交任务时命令行参数也是就那些被main方法接收的那些,不知道如何在taskManager获取到。导致必须从外部系统(hdfs,文件)来读取例如ES,mysql等连接配置。Flink是否有这种广播命令行参数的能力,如果没有是否会考虑加入这个特性。 > > > > Best > > Aven > > > > > >
回复: 关于Flink 命令行参数广播的问题
Hello 还有一个问题,除了在算子的Open方法中获取这个参数还有别的地方可以获取吗?或者在Gobgraph启动的生命周期里面有哪个阶段可以被调用提执行一些用户代码。 我的需求是需要通过命令行参数初始化一些静态类的属性,全局的静态类会被算子调用以执行某些通用的功能,如果我在open方法中去初始化的话是不太优雅,并且为了保证可用必须在每个算子的Open方法中都调用,对于一些非Rich的算子使用静态方法就会有问题。 Best Aven 发件人: zhisheng 发送时间: 2020年3月11日 21:16 收件人: user-zh 主题: Re: 关于Flink 命令行参数广播的问题 hi,aven.wu 可以使用 ParameterTool 获取到传入的参数,然后通过 env.getConfig().setGlobalJobParameters(parameterTool); 在算子中可以在 open 方法里面通过 getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); 获取到配置 aven.wu 于2020年3月11日周三 下午3:42写道: > Hi,大家好! > 遇到一个问题,在使用flink run > 提交任务时命令行参数也是就那些被main方法接收的那些,不知道如何在taskManager获取到。导致必须从外部系统(hdfs,文件)来读取例如ES,mysql等连接配置。Flink是否有这种广播命令行参数的能力,如果没有是否会考虑加入这个特性。 > > Best > Aven > >
Re: [SURVEY] 您在使用什么数据变更同步工具(CDC)?
Hi Benchao, Great feedbacks! 1) 全量初始化能力: 第一版中社区有计划引入 flink sql 直连 mysql 获取 binlog 的方案,该方案可以获取全量+增量 binlog,且有一致性保证。 但是对接db 全量+ mq binlog,将会是未来的一个工作,主要的难点在于全量如何平滑切换到 增量的 mq offset 上。 2) 自动生成watermark: 这也是 roadmap 上的一个工作。 3) binlog以state的形式存储,只需全量加载,后续只接受增量: 我理解这个和 (1) 是类似的需求。Flink SQL 对接之后 binlog,也即 binlog 数据流转成了一个动态表,也即 Flink 在维护这个动态表的数据(state)。 4) Schema 变更: 这会是生产中非常有用的一个特性。但是目前还没有很清楚的方案。 5) flink作为一个数据同步工具: 这是非常有可能的。基于 Flink 直连 db binlog 的能力,我们可以非常方便地基于该能力搭建出一个同步工具(类似 canal),基于 flink sql 丰富的生态, 可以快速对接各种 connector。这也许将来会成为一个三方库。 Best, Jark On Wed, 11 Mar 2020 at 23:52, Benchao Li wrote: > Hi, > > 感谢Jark发起这个话题的讨论,这个功能对于Flink SQL来讲是一个非常重要的扩展。 > > 问卷已填,再此再提几个小想法: > 1. 希望对接binlog时可以有全量初始化的能力,这样在Flink中我们就有了一个全表的实时状态,方便其他表与之进行join。 > 2. 希望能够自动生成watermark,这样子可以尽可能的减少接入成本。因为有些场景是其他的append > log数据可以跟实时维护的表进行join;也有些场景是两个binlog形成的动态表互相join。 > 3. 希望可以把binlog以state的形式存储在flink里,除了第一次启动需要全量加载,后续的运维都可以再此基础上只接收增量即可。 > 4. 如此之外,如果能有schema变更感知能力是最好的。(当然这个可能很难体现在SQL里面,毕竟SQL作业在启动时就已经确定了table > 的schema) > 5. > > 最后一点,感觉不太符合flink现在的定位,但是可能会有用户会这样来使用。就是直接把flink作为一个数据同步工具,消费binlog,直接同步到其他存储里面。(可能基本不需要做任何加工的那种,而且最好是能够有自动感知schema变更,同时可以变更下游的存储系统的schema) > > Jark Wu 于2020年3月11日周三 下午3:00写道: > > > Hi, 大家好, > > > > Flink 社区目前正在对接一些 CDC (Change Data Capture) 工具,以期在下个版本能支持读取和处理常见的 binlog > > 数据,所以需要调研下大家目前主要使用的 CDC 工具是什么。 > > > > 欢迎大家填下问卷调查,您的反馈对我们非常重要,谢谢! > > > > http://apacheflink.mikecrm.com/wDivVQ1 > > > > 也欢迎大家在这个邮件下讨论关于 Flink 对接 CDC 的一些想法、需求、期望。 > > > > Best, > > Jark > > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn >
Re: FlinkSQL 1.10 DDL无法指定水印
Benchao is right. 嵌套字段是无法直接访问的,需要逐级引用到。 On Thu, 12 Mar 2020 at 00:45, Benchao Li wrote: > Hi 周炎, > > 你的`date` 和 `time`都是在嵌套结构内部的字段,需要用*request.`value`.`date`*和* > request.`value`.`time`*来使用它们。 > > 周炎 于2020年3月11日周三 下午5:42写道: > > > DDL语句如下: > > CREATE TABLE ods_usage_naga_dsp_filter ( > > request row<`value` row > varchar,reqid varchar,source varchar,`time` varchar,`filter` > > array> >>>, > > event_ts as to_timestamp(concat(`date`,`time`),'-MM-ddHH:mm:ss'), > > WATERMARK FOR event_ts AS event_ts - interval '60' second > > )WITH ( > > 'connector.type' = 'kafka', > > 'format.fail-on-missing-field'='false', > > 'connector.version' = 'universal', > > 'connector.topic' = 'xxfilter', > > 'connector.startup-mode' = 'latest-offset', > > 'connector.properties.zookeeper.connect'='xx:2181', > > 'connector.properties.bootstrap.servers'='xx:9092', > > 'connector.properties.group.id'='xx_test', > > 'update-mode' = 'append', > > 'format.type' = 'json' > > ); > > 启动程序报错: > > org.apache.flink.client.program.ProgramInvocationException: The main > method > > caused an error: From line 3, column 37 to line 3, column 42: Unknown > > identifier 'date' > > at > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > > at > > > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > > at > > > > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) > > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > > at > > > > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) > > at > > > > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:422) > > at > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > > at > > > > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > > Caused by: org.apache.calcite.runtime.CalciteContextException: From line > 3, > > column 37 to line 3, column 42: Unknown identifier 'date' > > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > > at > > > > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > > at > > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > > at > > > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) > > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) > > at > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841) > > at > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667) > > at > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) > > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) > > at > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) > > at > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) > > at > > > > > org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593) > > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237) > > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219) > > at > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) > > at > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) > > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > > at > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) > > at > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) > > at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:236) > > at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407) > > at org.apache.calcite.sql.SqlFunction.validateCall(SqlFunction.java:200) > > at > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5304) > > at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:116) > > at > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:943) > > at > > > > > org.apache.calcite.sql.validate.SqlValidatorImpl.validatePar
Re: FlinkSQL 1.10 DDL无法指定水印
Hi 周炎, 你的`date` 和 `time`都是在嵌套结构内部的字段,需要用*request.`value`.`date`*和* request.`value`.`time`*来使用它们。 周炎 于2020年3月11日周三 下午5:42写道: > DDL语句如下: > CREATE TABLE ods_usage_naga_dsp_filter ( > request row<`value` row varchar,reqid varchar,source varchar,`time` varchar,`filter` > array> >>>, > event_ts as to_timestamp(concat(`date`,`time`),'-MM-ddHH:mm:ss'), > WATERMARK FOR event_ts AS event_ts - interval '60' second > )WITH ( > 'connector.type' = 'kafka', > 'format.fail-on-missing-field'='false', > 'connector.version' = 'universal', > 'connector.topic' = 'xxfilter', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.zookeeper.connect'='xx:2181', > 'connector.properties.bootstrap.servers'='xx:9092', > 'connector.properties.group.id'='xx_test', > 'update-mode' = 'append', > 'format.type' = 'json' > ); > 启动程序报错: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: From line 3, column 37 to line 3, column 42: Unknown > identifier 'date' > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3, > column 37 to line 3, column 42: Unknown identifier 'date' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) > at > > org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) > at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:236) > at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407) > at org.apache.calcite.sql.SqlFunction.validateCall(SqlFunction.java:200) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5304) > at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:116) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:943) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930) > at > > org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:490) > at > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177) > at > > org.apache.flink.table.planner.operations.SqlToOperationConvert
Re: [SURVEY] 您在使用什么数据变更同步工具(CDC)?
Hi, 感谢Jark发起这个话题的讨论,这个功能对于Flink SQL来讲是一个非常重要的扩展。 问卷已填,再此再提几个小想法: 1. 希望对接binlog时可以有全量初始化的能力,这样在Flink中我们就有了一个全表的实时状态,方便其他表与之进行join。 2. 希望能够自动生成watermark,这样子可以尽可能的减少接入成本。因为有些场景是其他的append log数据可以跟实时维护的表进行join;也有些场景是两个binlog形成的动态表互相join。 3. 希望可以把binlog以state的形式存储在flink里,除了第一次启动需要全量加载,后续的运维都可以再此基础上只接收增量即可。 4. 如此之外,如果能有schema变更感知能力是最好的。(当然这个可能很难体现在SQL里面,毕竟SQL作业在启动时就已经确定了table 的schema) 5. 最后一点,感觉不太符合flink现在的定位,但是可能会有用户会这样来使用。就是直接把flink作为一个数据同步工具,消费binlog,直接同步到其他存储里面。(可能基本不需要做任何加工的那种,而且最好是能够有自动感知schema变更,同时可以变更下游的存储系统的schema) Jark Wu 于2020年3月11日周三 下午3:00写道: > Hi, 大家好, > > Flink 社区目前正在对接一些 CDC (Change Data Capture) 工具,以期在下个版本能支持读取和处理常见的 binlog > 数据,所以需要调研下大家目前主要使用的 CDC 工具是什么。 > > 欢迎大家填下问卷调查,您的反馈对我们非常重要,谢谢! > > http://apacheflink.mikecrm.com/wDivVQ1 > > 也欢迎大家在这个邮件下讨论关于 Flink 对接 CDC 的一些想法、需求、期望。 > > Best, > Jark > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn
Re: scaling issue Running Flink on Kubernetes
Hi Flavio, We have implemented our own flink operator, the operator will start a flink job cluster (the application jar is already packaged together with flink in the docker image). I believe Google's flink operator will start a session cluster, and user can submit the flink job via REST. Not looked into lyft one before. Eleanore On Wed, Mar 11, 2020 at 2:21 AM Flavio Pompermaier wrote: > Sorry I wanted to mention https://github.com/lyft/flinkk8soperator (I > don't know which one of the 2 is better) > > On Wed, Mar 11, 2020 at 10:19 AM Flavio Pompermaier > wrote: > >> Have you tried to use existing operators such as >> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator or >> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator? >> >> On Wed, Mar 11, 2020 at 4:46 AM Xintong Song >> wrote: >> >>> Hi Eleanore, >>> >>> That does't sound like a scaling issue. It's probably a data skew, that >>> the data volume on some of the keys are significantly higher than others. >>> I'm not familiar with this area though, and have copied Jark for you, who >>> is one of the community experts in this area. >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> On Wed, Mar 11, 2020 at 10:37 AM Eleanore Jin >>> wrote: >>> _Hi Xintong, Thanks for the prompt reply! To answer your question: - Which Flink version are you using? v1.8.2 - Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value? I also tried this, it seems skew also happens even I do not change the parallelism, so it may not caused by scale-up/down - Keeping the job running a while after the scale-up, does the skew ease? So the skew happens in such a way that: some partitions lags down to 0, but other partitions are still at level of 10_000, and I am seeing the back pressure is ok. Thanks a lot! Eleanore On Tue, Mar 10, 2020 at 7:03 PM Xintong Song wrote: > Hi Eleanore, > > I have a few more questions regarding your issue. > >- Which Flink version are you using? >- Is this skew observed only after a scaling-up? What happens if >the parallelism is initially set to the scaled-up value? >- Keeping the job running a while after the scale-up, does the >skew ease? > > I suspect the performance difference might be an outcome of some > warming up issues. E.g., the existing TMs might have some file already > localized, or some memory buffers already promoted to the JVM tenured > area, > while the new TMs have not. > > Thank you~ > > Xintong Song > > > > On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin > wrote: > >> Hi Experts, >> I have my flink application running on Kubernetes, initially with 1 >> Job Manager, and 2 Task Managers. >> >> Then we have the custom operator that watches for the CRD, when the >> CRD replicas changed, it will patch the Flink Job Manager deployment >> parallelism and max parallelism according to the replicas from CRD >> (parallelism can be configured via env variables for our application). >> which causes the job manager restart. hence a new Flink job. But the >> consumer group does not change, so it will continue from the offset >> where it left. >> >> In addition, operator will also update Task Manager's deployment >> replicas, and will adjust the pod number. >> >> In case of scale up, the existing task manager pods do not get >> killed, but new task manager pods will be created. >> >> And we observed a skew in the partition offset consumed. e.g. some >> partitions have huge lags and other partitions have small lags. (observed >> from burrow) >> >> This is also validated by the metrics from Flink UI, showing the >> throughput differs for slotss >> >> Any clue why this is the case? >> >> Thanks a lot! >> Eleanore >> > >> >
Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-11-133919.png 我看现在还不支持 per job 模式,哎 zhisheng 于2020年3月11日周三 下午9:31写道: > 好的,我先去 look look,感谢 > > Kurt Young 于2020年3月11日周三 下午9:30写道: > >> https://github.com/ververica/flink-sql-gateway 了解一下 >> >> Best, >> Kurt >> >> >> On Wed, Mar 11, 2020 at 9:26 PM zhisheng wrote: >> >> > hi, Kurt Young >> > >> > 除了使用 sql-client 可以使用纯 SQL >> 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行 >> > sql-client >> > >> > Kurt Young 于2020年3月11日周三 下午7:59写道: >> > >> > > 那有可能是可以的,你可以试试看 >> > > >> > > Best, >> > > Kurt >> > > >> > > >> > > On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn < >> > > wangl...@geekplus.com.cn> wrote: >> > > >> > > > Hi Kurt, >> > > > >> > > > 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 >> state >> > > > 中恢复的功能吗? >> > > > 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state >> > > > 存储并且再次提交任务可以被访问到直接用吗? >> > > > >> > > > 谢谢, >> > > > 王磊 >> > > > >> > > > -- >> > > > wangl...@geekplus.com.cn >> > > > >> > > > >> > > > *Sender:* Kurt Young >> > > > *Send Time:* 2020-03-11 12:54 >> > > > *Receiver:* wangl...@geekplus.com.cn >> > > > *cc:* user-zh >> > > > *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? >> > > > sql client 目前还不支持这个功能。 >> > > > >> > > > Best, >> > > > Kurt >> > > > >> > > > >> > > > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn < >> > > > wangl...@geekplus.com.cn> wrote: >> > > > >> > > >> Hi Kurt, >> > > >> 确实是可以 直接 flink cancel -s 保存状态。 >> > > >> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? >> > > >> >> > > >> 谢谢, >> > > >> 王磊 >> > > >> >> > > >> >> > > >> *Sender:* Kurt Young >> > > >> *Send Time:* 2020-03-11 10:38 >> > > >> *Receiver:* user-zh >> > > >> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? >> > > >> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 >> > > >> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 >> > > >> >> > > >> Best, >> > > >> Kurt >> > > >> >> > > >> >> > > >> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < >> > > >> wangl...@geekplus.com.cn> wrote: >> > > >> >> > > >> > 有两个表: >> > > >> > tableA: key valueA >> > > >> > tableB: key valueB >> > > >> > >> > > >> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 >> > valueA >> > > >> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 >> > > >> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? >> > > >> > >> > > >> > 谢谢, >> > > >> > 王磊 >> > > >> > >> > > >> >> > > >> >> > > >> > >> >
Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
好的,我先去 look look,感谢 Kurt Young 于2020年3月11日周三 下午9:30写道: > https://github.com/ververica/flink-sql-gateway 了解一下 > > Best, > Kurt > > > On Wed, Mar 11, 2020 at 9:26 PM zhisheng wrote: > > > hi, Kurt Young > > > > 除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行 > > sql-client > > > > Kurt Young 于2020年3月11日周三 下午7:59写道: > > > > > 那有可能是可以的,你可以试试看 > > > > > > Best, > > > Kurt > > > > > > > > > On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn < > > > wangl...@geekplus.com.cn> wrote: > > > > > > > Hi Kurt, > > > > > > > > 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state > > > > 中恢复的功能吗? > > > > 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state > > > > 存储并且再次提交任务可以被访问到直接用吗? > > > > > > > > 谢谢, > > > > 王磊 > > > > > > > > -- > > > > wangl...@geekplus.com.cn > > > > > > > > > > > > *Sender:* Kurt Young > > > > *Send Time:* 2020-03-11 12:54 > > > > *Receiver:* wangl...@geekplus.com.cn > > > > *cc:* user-zh > > > > *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? > > > > sql client 目前还不支持这个功能。 > > > > > > > > Best, > > > > Kurt > > > > > > > > > > > > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn < > > > > wangl...@geekplus.com.cn> wrote: > > > > > > > >> Hi Kurt, > > > >> 确实是可以 直接 flink cancel -s 保存状态。 > > > >> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? > > > >> > > > >> 谢谢, > > > >> 王磊 > > > >> > > > >> > > > >> *Sender:* Kurt Young > > > >> *Send Time:* 2020-03-11 10:38 > > > >> *Receiver:* user-zh > > > >> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? > > > >> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 > > > >> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 > > > >> > > > >> Best, > > > >> Kurt > > > >> > > > >> > > > >> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < > > > >> wangl...@geekplus.com.cn> wrote: > > > >> > > > >> > 有两个表: > > > >> > tableA: key valueA > > > >> > tableB: key valueB > > > >> > > > > >> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 > > valueA > > > >> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 > > > >> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? > > > >> > > > > >> > 谢谢, > > > >> > 王磊 > > > >> > > > > >> > > > >> > > > > > >
Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
https://github.com/ververica/flink-sql-gateway 了解一下 Best, Kurt On Wed, Mar 11, 2020 at 9:26 PM zhisheng wrote: > hi, Kurt Young > > 除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行 > sql-client > > Kurt Young 于2020年3月11日周三 下午7:59写道: > > > 那有可能是可以的,你可以试试看 > > > > Best, > > Kurt > > > > > > On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn < > > wangl...@geekplus.com.cn> wrote: > > > > > Hi Kurt, > > > > > > 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state > > > 中恢复的功能吗? > > > 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state > > > 存储并且再次提交任务可以被访问到直接用吗? > > > > > > 谢谢, > > > 王磊 > > > > > > -- > > > wangl...@geekplus.com.cn > > > > > > > > > *Sender:* Kurt Young > > > *Send Time:* 2020-03-11 12:54 > > > *Receiver:* wangl...@geekplus.com.cn > > > *cc:* user-zh > > > *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? > > > sql client 目前还不支持这个功能。 > > > > > > Best, > > > Kurt > > > > > > > > > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn < > > > wangl...@geekplus.com.cn> wrote: > > > > > >> Hi Kurt, > > >> 确实是可以 直接 flink cancel -s 保存状态。 > > >> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? > > >> > > >> 谢谢, > > >> 王磊 > > >> > > >> > > >> *Sender:* Kurt Young > > >> *Send Time:* 2020-03-11 10:38 > > >> *Receiver:* user-zh > > >> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? > > >> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 > > >> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 > > >> > > >> Best, > > >> Kurt > > >> > > >> > > >> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < > > >> wangl...@geekplus.com.cn> wrote: > > >> > > >> > 有两个表: > > >> > tableA: key valueA > > >> > tableB: key valueB > > >> > > > >> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 > valueA > > >> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 > > >> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? > > >> > > > >> > 谢谢, > > >> > 王磊 > > >> > > > >> > > >> > > >
Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
hi, Kurt Young 除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行 sql-client Kurt Young 于2020年3月11日周三 下午7:59写道: > 那有可能是可以的,你可以试试看 > > Best, > Kurt > > > On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn < > wangl...@geekplus.com.cn> wrote: > > > Hi Kurt, > > > > 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state > > 中恢复的功能吗? > > 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state > > 存储并且再次提交任务可以被访问到直接用吗? > > > > 谢谢, > > 王磊 > > > > -- > > wangl...@geekplus.com.cn > > > > > > *Sender:* Kurt Young > > *Send Time:* 2020-03-11 12:54 > > *Receiver:* wangl...@geekplus.com.cn > > *cc:* user-zh > > *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? > > sql client 目前还不支持这个功能。 > > > > Best, > > Kurt > > > > > > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn < > > wangl...@geekplus.com.cn> wrote: > > > >> Hi Kurt, > >> 确实是可以 直接 flink cancel -s 保存状态。 > >> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? > >> > >> 谢谢, > >> 王磊 > >> > >> > >> *Sender:* Kurt Young > >> *Send Time:* 2020-03-11 10:38 > >> *Receiver:* user-zh > >> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? > >> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 > >> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 > >> > >> Best, > >> Kurt > >> > >> > >> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < > >> wangl...@geekplus.com.cn> wrote: > >> > >> > 有两个表: > >> > tableA: key valueA > >> > tableB: key valueB > >> > > >> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA > >> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 > >> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? > >> > > >> > 谢谢, > >> > 王磊 > >> > > >> > >> >
Re: 关于Flink 命令行参数广播的问题
hi,aven.wu 可以使用 ParameterTool 获取到传入的参数,然后通过 env.getConfig().setGlobalJobParameters(parameterTool); 在算子中可以在 open 方法里面通过 getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); 获取到配置 aven.wu 于2020年3月11日周三 下午3:42写道: > Hi,大家好! > 遇到一个问题,在使用flink run > 提交任务时命令行参数也是就那些被main方法接收的那些,不知道如何在taskManager获取到。导致必须从外部系统(hdfs,文件)来读取例如ES,mysql等连接配置。Flink是否有这种广播命令行参数的能力,如果没有是否会考虑加入这个特性。 > > Best > Aven > >
Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
那有可能是可以的,你可以试试看 Best, Kurt On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > Hi Kurt, > > 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state > 中恢复的功能吗? > 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state > 存储并且再次提交任务可以被访问到直接用吗? > > 谢谢, > 王磊 > > -- > wangl...@geekplus.com.cn > > > *Sender:* Kurt Young > *Send Time:* 2020-03-11 12:54 > *Receiver:* wangl...@geekplus.com.cn > *cc:* user-zh > *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? > sql client 目前还不支持这个功能。 > > Best, > Kurt > > > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn < > wangl...@geekplus.com.cn> wrote: > >> Hi Kurt, >> 确实是可以 直接 flink cancel -s 保存状态。 >> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? >> >> 谢谢, >> 王磊 >> >> >> *Sender:* Kurt Young >> *Send Time:* 2020-03-11 10:38 >> *Receiver:* user-zh >> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? >> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 >> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 >> >> Best, >> Kurt >> >> >> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < >> wangl...@geekplus.com.cn> wrote: >> >> > 有两个表: >> > tableA: key valueA >> > tableB: key valueB >> > >> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA >> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 >> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? >> > >> > 谢谢, >> > 王磊 >> > >> >>
Re: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Thanks Jark, No word to express my '囧'. wangl...@geekplus.com.cn Sender: Jark Wu Send Time: 2020-03-11 18:32 Receiver: wangl...@geekplus.com.cn cc: user; user-zh Subject: Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 Hi Lei, The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong field name in the DDL. It should be "input_date", not "intput_date". Best, Jark On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn wrote: Sorry i sent the Chinese written email to user@ Let me translate it to English. I create a table using sql-client from kafka topic: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) Then I send message to the topic: {"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} But the input_date is not recognized on the sql-client and is null, even i tried 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" How should the timestamp(3) look like in the json message? Thanks, Lei wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-03-11 17:41 收件人: user 主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 用 sql-client create 了一个 kafka table: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) 然后往 kafka 这个 topic 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} input_date 在 sql-clinet 端始终是 NULL. 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" 也都不行。 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? 谢谢, 王磊 wangl...@geekplus.com.cn
Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
Hi Kurt, 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state 中恢复的功能吗? 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state 存储并且再次提交任务可以被访问到直接用吗? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Kurt Young Send Time: 2020-03-11 12:54 Receiver: wangl...@geekplus.com.cn cc: user-zh Subject: Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? sql client 目前还不支持这个功能。 Best, Kurt On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn wrote: Hi Kurt, 确实是可以 直接 flink cancel -s 保存状态。 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢? 谢谢, 王磊 Sender: Kurt Young Send Time: 2020-03-11 10:38 Receiver: user-zh Subject: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗? 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。 Best, Kurt On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > 有两个表: > tableA: key valueA > tableB: key valueB > > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。 > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢? > > 谢谢, > 王磊 >
Re: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Hi Lei, The "2020-03-11T13:00:00.123Z" format is correct, but you defined the wrong field name in the DDL. It should be "input_date", not "intput_date". Best, Jark On Wed, 11 Mar 2020 at 17:52, wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > > Sorry i sent the Chinese written email to user@ > Let me translate it to English. > > I create a table using sql-client from kafka topic: > > CREATE TABLE order_status ( > out_order_code VARCHAR, > intput_date TIMESTAMP(3), > owner_code VARCHAR, > status INT > ) WITH ( > 'connector.type' = 'kafka', > > . > 'format.type' = 'json', > > 'format.derive-schema' = 'true' > > ) > > Then I send message to the topic: > {"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} > But the input_date is not recognized on the sql-client and is null, even i > tried 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" > > How should the timestamp(3) look like in the json message? > > Thanks, > Lei > > > -- > > wangl...@geekplus.com.cn > > > *发件人:* wangl...@geekplus.com.cn > *发送时间:* 2020-03-11 17:41 > *收件人:* user > *主题:* json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 > > 用 sql-client create 了一个 kafka table: > > CREATE TABLE order_status ( > out_order_code VARCHAR, > intput_date TIMESTAMP(3), > owner_code VARCHAR, > status INT > ) WITH ( > 'connector.type' = 'kafka', > > . > 'format.type' = 'json', > > 'format.derive-schema' = 'true' > > ) > > 然后往 kafka 这个 topic > 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} > input_date 在 sql-clinet 端始终是 NULL. > 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 > 13:00:00.000" 也都不行。 > 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? > > 谢谢, > 王磊 > > -- > wangl...@geekplus.com.cn > >
回复: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别
Sorry i sent the Chinese written email to user@ Let me translate it to English. I create a table using sql-client from kafka topic: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) Then I send message to the topic: {"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} But the input_date is not recognized on the sql-client and is null, even i tried 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" How should the timestamp(3) look like in the json message? Thanks, Lei wangl...@geekplus.com.cn 发件人: wangl...@geekplus.com.cn 发送时间: 2020-03-11 17:41 收件人: user 主题: json 中 timestamp 类型在json中怎样写才能被 flink sql 识别 用 sql-client create 了一个 kafka table: CREATE TABLE order_status ( out_order_code VARCHAR, intput_date TIMESTAMP(3), owner_code VARCHAR, status INT ) WITH ( 'connector.type' = 'kafka',. 'format.type' = 'json', 'format.derive-schema' = 'true' ) 然后往 kafka 这个 topic 发送消息:{"out_order_code":"MAMICK2020031048","input_date":"2020-03-11T13:00:00.123Z","owner_code":"WDTLEN04","status":90} input_date 在 sql-clinet 端始终是 NULL. 我把 发送的 input_date 改成 1583828700240 "2020-03-11 13:00:00" "2020-03-11 13:00:00.000" 也都不行。 这个 TIMESTAMP(3)在JSON 中应该写成什么样子呢? 谢谢, 王磊 wangl...@geekplus.com.cn
FlinkSQL 1.10 DDL无法指定水印
DDL语句如下: CREATE TABLE ods_usage_naga_dsp_filter ( request row<`value` row> >>>, event_ts as to_timestamp(concat(`date`,`time`),'-MM-ddHH:mm:ss'), WATERMARK FOR event_ts AS event_ts - interval '60' second )WITH ( 'connector.type' = 'kafka', 'format.fail-on-missing-field'='false', 'connector.version' = 'universal', 'connector.topic' = 'xxfilter', 'connector.startup-mode' = 'latest-offset', 'connector.properties.zookeeper.connect'='xx:2181', 'connector.properties.bootstrap.servers'='xx:9092', 'connector.properties.group.id'='xx_test', 'update-mode' = 'append', 'format.type' = 'json' ); 启动程序报错: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: From line 3, column 37 to line 3, column 42: Unknown identifier 'date' at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3, column 37 to line 3, column 42: Unknown identifier 'date' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:236) at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407) at org.apache.calcite.sql.SqlFunction.validateCall(SqlFunction.java:200) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5304) at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:116) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:943) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930) at org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:490) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at com.cootek.streaming.flink.SQLSubmit$.callCreateTable(SQLSubmit.scala:31) at com.cootek.streaming.flink.SQLSubmit$.callCommand(SQLSubmit.scala:15) at com.cootek.streaming.FlinkBootstrap$$anonfun$1.app
关于Flink 命令行参数广播的问题
Hi,大家好! 遇到一个问题,在使用flink run 提交任务时命令行参数也是就那些被main方法接收的那些,不知道如何在taskManager获取到。导致必须从外部系统(hdfs,文件)来读取例如ES,mysql等连接配置。Flink是否有这种广播命令行参数的能力,如果没有是否会考虑加入这个特性。 Best Aven
Flink有类似storm主动fail的机制吗?
Hi, Flink有类似storm主动fail的机制吗? 没有的话,有什么好的实现方案吗?比如用状态存储失败的记录? 感谢您的回复 | | Sun.Zhu | | 邮箱:17626017...@163.com | Signature is customized by Netease Mail Master
[SURVEY] 您在使用什么数据变更同步工具(CDC)?
Hi, 大家好, Flink 社区目前正在对接一些 CDC (Change Data Capture) 工具,以期在下个版本能支持读取和处理常见的 binlog 数据,所以需要调研下大家目前主要使用的 CDC 工具是什么。 欢迎大家填下问卷调查,您的反馈对我们非常重要,谢谢! http://apacheflink.mikecrm.com/wDivVQ1 也欢迎大家在这个邮件下讨论关于 Flink 对接 CDC 的一些想法、需求、期望。 Best, Jark