Re: computed column转为timestamp类型后进行窗口聚合报错

2020-12-10 文章 Danny Chan
有木有尝试补充 watermark 语法

jun su  于2020年12月11日周五 上午10:47写道:

> hi all,
>
> flink 1.11.0版本, 使用computed column将long类型字段转为timestamp后进行时间窗口聚合,报如下错误:
>
> ddl:
>
> CREATE TABLE source(
> occur_time BIGINT,
> rowtime AS longToTimestamp(occur_time)
> ) WITH ('connector' = 'filesystem','format' = 'orc','path' =
> '/path/to/data')
>
> 报错信息:
>
> Caused by: java.lang.IllegalArgumentException: field [$f0] not found; input
> fields are: [occur_time]
> at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402)
> at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385)
> at
>
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720)
> at
>
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161)
> at
>
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111)
> at
>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
> ... 27 more
>
> --
> Best,
> Jun Su
>


Re: flink 1.12如何使用RateLimiter

2020-12-09 文章 Danny Chan
您好 请问是什么场景呢 ?限速的目的是什么 ?

18757141558 <18757141...@163.com> 于2020年12月9日周三 下午6:49写道:

> 在源码中找到 FlinkConnectorRateLimiter  和  GuavaFlinkConnectorRateLimiter
> kafka相关的类中没有找到这些配置
> 请问如何在api中使用RateLimiter(不修改源码方式)


Re: Flink sql 无法用!=

2020-11-15 文章 Danny Chan
是的 <> 是 SQL 标准推荐的用法。

jindy_liu <286729...@qq.com> 于2020年11月16日周一 下午2:24写道:

> 用<>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象

2020-11-15 文章 Danny Chan
DataSet 已经是社区准备 deprecate 的 API 了,不建议再使用。1.12 版本后推荐统一使用 DataStream,使用
sqlQuery 接口拿到 table 对象后转成 DataStream。

Asahi Lee <978466...@qq.com> 于2020年11月13日周五 下午4:05写道:

> BatchTableEnvironment对象可以进行table to dataset; dataset to table
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danny0...@apache.org;
> 发送时间:2020年11月10日(星期二) 下午2:43
> 收件人:"user-zh"
> 主题:Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象
>
>
>
> 拿 BatchTableEnvironment 对象作什么用处呢 ? 我们有 TableEnvironmentInternal 但是不推荐使用。
>
> Asahi Lee <978466...@qq.com 于2020年11月9日周一 下午5:09写道:
>
>  是的,BatchTableEnvironment 对象
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  danny0...@apache.orggt;;
>  发送时间:nbsp;2020年11月9日(星期一) 中午12:34
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象
> 
> 
> 
>  gt;
>  gt; BatchTableEnvironment 环境
> 
> 
>  是说nbsp; BatchTableEnvironment 对象吗
> 
>  Asahi Lee <978466...@qq.comgt; 于2020年11月9日周一 上午10:48写道:
> 
>  gt; 你好!
>  gt; amp;nbsp; amp;nbsp; amp;nbsp; 我使用的是flink
>  1.11.2版本,官网的文档中说明blink的batch执行环境以如下方式获取:
>  gt; // ** // BLINK BATCH QUERY //
> **
>  import
>  gt; org.apache.flink.table.api.EnvironmentSettings; import
>  gt; org.apache.flink.table.api.TableEnvironment;
> EnvironmentSettings
>  bbSettings
>  gt; =
>  gt;
> 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>  gt; TableEnvironment bbTableEnv =
>  gt;
> 
> TableEnvironment.create(bbSettings);除过上述的方法之外,是否还有其他方式获取到blink的batch执行环境?而我需要的是BatchTableEnvironment环境,该如何获取?


Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-12 文章 Danny Chan
如果 gap 溢出只是少数 record 满足,用 window 性能确实不好,可以考虑用传统的 kv 记录状态

Lei Wang  于2020年11月12日周四 下午9:17写道:

> 用 session windown 确实能满足功能:
>
>
> robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x,
> y) -> y);
>
> 按照这种写法, 我理解 window state 中只保存了最近的一条记录。
>
>
> 正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。
>
>
>
> On Thu, Nov 12, 2020 at 5:25 PM hailongwang <18868816...@163.com> wrote:
>
> >
> >
> >
> > 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
> > 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
> >
> >
> >
> >
> > 在 2020-11-12 14:34:59,"Danny Chan"  写道:
> > >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
> > >
> > >Lei Wang  于2020年11月11日周三 下午2:03写道:
> > >
> > >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
> > >>
> > >> 比如
> > >> robot1   2020-11-11 12:00:00 msginfo
> > >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00
> > 就发出报警呢?
> > >>
> > >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
> > >>
> > >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
> > >>
> > >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
> > >> 我们必须 按 robotId 做 keyBy
> > >>
> > >> 求大神指教。
> > >>
> > >> 谢谢,
> > >> 王磊
> > >>
> >
>


Re: Flink sql查询NULL值错误

2020-11-11 文章 Danny Chan
是的 Flink SQL 现在还不支持隐式类型,需要手动设置 NULL 的类型 SQL 才能通过编译。

丁浩浩 <18579099...@163.com> 于2020年11月10日周二 下午8:52写道:

> 感谢大佬!!!
>
> > 在 2020年11月10日,下午8:22,hailongwang <18868816...@163.com> 写道:
> >
> > Hi,
> >
> >
> > 需要将 null cast 成某个具体的值,比如:
> > if(type=1,2,cast(null as int))
> >
> >
> > Best,
> > Hailong
> > 在 2020-11-10 19:14:44,"丁浩浩" <18579099...@163.com> 写道:
> >> Select
> >>  id,
> >>  name,
> >>  if(type=1,2,null)
> >> From
> >>  user ;
> >> 当我执行上面的sql的时候提示我
> >> [ERROR] Could not execute SQL statement. Reason:
> >> org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of
> ‘NULL’
> >> 是无法将null展示吗?
> >
> >
> >
>
>
>


Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

2020-11-11 文章 Danny Chan
感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑

Lei Wang  于2020年11月11日周三 下午2:03写道:

> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
>
> 比如
> robot1   2020-11-11 12:00:00 msginfo
> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00 就发出报警呢?
>
> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
>
> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
>
> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
> 我们必须 按 robotId 做 keyBy
>
> 求大神指教。
>
> 谢谢,
> 王磊
>


Re: Flink SQL传递性

2020-11-09 文章 Danny Chan
创建 view ?

amen...@163.com  于2020年11月10日周二 下午3:28写道:

> hi everyone,
>
> Flink SQL有没有上一个SQL的输出是下一个SQL的输入的业务场景思路?
> 比如说KafkaSource -> SQL_1 -> SQL_2 -> MysqlSink,一整个链起来,作为一个任务提交运行~
>
> best,
> amenhub
>


Re: Flink sql tinyint类型使用in 报错

2020-11-09 文章 Danny Chan
好的 了解

丁浩浩 <18579099...@163.com> 于2020年11月10日周二 下午3:19写道:

> 就比如我要用flink cdc 接入mysql表,一般都是直接将mysql表的DDL稍加修改然后在flink sql中创建,一般都不会考虑到
> 类型转换的问题就直接沿用mysql中的类型,当然sql也是一样的。同样的sql在满足语法正确性的情况下,mysql中能跑,而flinksql中无法跑,
> 当然可以通过显示类型转化来完成,但是能提供自动转化会更好的提供易用性。
>
> > 在 2020年11月10日,下午2:51,Danny Chan  写道:
> >
> > 暂时还没有 你们是什么场景需要用到隐式类型
> >
> > 丁浩浩 <18579099...@163.com> 于2020年11月10日周二 下午2:45写道:
> >
> >> 请问有没有计划加入隐式类型自动转化呢
> >>
> >>> 在 2020年11月10日,下午2:35,Jark Wu  写道:
> >>>
> >>> 是的。Flink 目前还不支持隐式类型转换。
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>> On Tue, 10 Nov 2020 at 14:28, hailongwang <18868816...@163.com> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>>
> >>>> 从你的报错来看,是 in 不支持隐式 CAST。
> >>>> 你要么可以把 type 定义成 INT,要不把后面的值 CAST 成  TINYINT。
> >>>>
> >>>>
> >>>> Best,
> >>>> Hailong Wang
> >>>>
> >>>> 在 2020-11-10 10:41:47,"丁浩浩" <18579099...@163.com> 写道:
> >>>>> 我使用flink sql cdc取连接 mysql表的时候,当我的mysql表type这个字段类型是tinyint类型时 使用type
> >>>> in(1,2,3,4,5)会报以下的错误,只有当我把字段类型改成int的时候才能使用in,这是符合预期的吗,当字段类型不匹配的时候
> flink
> >>>> sql不会自动转换类型吗?
> >>>>>
> >>>>> [ERROR] Could not execute SQL statement. Reason:
> >>>>> org.codehaus.commons.compiler.CompileException: Line 6, Column 88: No
> >>>> applicable constructor/method found for actual parameters "int";
> >> candidates
> >>>> are: "org.apache.flink.table.runtime.util.collections.ByteHashSet()
> >>>>
> >>
> >>
> >>
>
>
>


Re: SQL从1.9迁移到1.11的问题

2020-11-09 文章 Danny Chan
通过 Table 操作流程的 DAG 现在不再会缓存到底层的 exec env 中,为了避免 transformations
污染,所以是拿不到的,但是内部代码我们仍然是先拼接 StreamGraph 然后直接通过 exec env 提交。

izual  于2020年10月30日周五 下午5:04写道:

> hi,Community:
>
>
> 我们目前使用的是 flink 1.9.1 执行 SQL 任务,主要使用了以下几种接口:
> 1. sqlQuery sqlUpdate: 执行表的创建、查找和写入
> 2. toAppendStream/toRetractStream:将表转换为流后,通过 DataStream.addSink(new
> RichSinkFunction )写入
> 3. registerDataStream:将流注册为表,下一步使用 sqlQuery/sqlUpdate 读写该表
>
>
> 最后通过 env.execute() 或者 tableEnv.execute() 执行:通过 RichSinkFunction.invoke 或者
> sqlUpdate(DML) 更新到存储,这两种输出形式都可能多次调用。
>
>
> 看到文档里,这部分接口 [1][2] 的行为有些变化,实际使用1.11后,有几处困惑想请教:
>
>
> 1. 如果预期混用 SQL/DataStream 的接口,我目前按照3里的介绍,使用 sqlUpdate,然后通过 tEnv.execute()
> 来输出。具体的,程序设置两个输出,分别是 RichSinkFunction.invoke 以及 sqlUpdate,观察到只有 sqlUpdate
> 更新了数据,RichSinkFunction 没有执行。如果希望同时输出的话,是必须将 RichSinkFunction.invoke
> 的部分也都实现为 StreamTableSink 么,是否有其他成本较低的迁移方式?如果按照 1.11 区分 env/tableEnv
> 的思路,这种情况怎么实现更加合理?
> 2. 对于这种情况,env.getExecutionPlan 获取的只是调用 DataStream 接口的 DAG 图,如果要获取 Table
> 操作流程的 DAG,应该通过 tableEnv 的哪个接口获取?
>
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html#corrected-execution-behavior-of-tableenvironmentexecute-and-streamtableenvironmentexecute-flink-16363
> 2.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> 3.
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query
>
>


Re: Flink sql tinyint类型使用in 报错

2020-11-09 文章 Danny Chan
暂时还没有 你们是什么场景需要用到隐式类型

丁浩浩 <18579099...@163.com> 于2020年11月10日周二 下午2:45写道:

> 请问有没有计划加入隐式类型自动转化呢
>
> > 在 2020年11月10日,下午2:35,Jark Wu  写道:
> >
> > 是的。Flink 目前还不支持隐式类型转换。
> >
> > Best,
> > Jark
> >
> > On Tue, 10 Nov 2020 at 14:28, hailongwang <18868816...@163.com> wrote:
> >
> >> Hi,
> >>
> >>
> >> 从你的报错来看,是 in 不支持隐式 CAST。
> >> 你要么可以把 type 定义成 INT,要不把后面的值 CAST 成  TINYINT。
> >>
> >>
> >> Best,
> >> Hailong Wang
> >>
> >> 在 2020-11-10 10:41:47,"丁浩浩" <18579099...@163.com> 写道:
> >>> 我使用flink sql cdc取连接 mysql表的时候,当我的mysql表type这个字段类型是tinyint类型时 使用type
> >> in(1,2,3,4,5)会报以下的错误,只有当我把字段类型改成int的时候才能使用in,这是符合预期的吗,当字段类型不匹配的时候 flink
> >> sql不会自动转换类型吗?
> >>>
> >>> [ERROR] Could not execute SQL statement. Reason:
> >>> org.codehaus.commons.compiler.CompileException: Line 6, Column 88: No
> >> applicable constructor/method found for actual parameters "int";
> candidates
> >> are: "org.apache.flink.table.runtime.util.collections.ByteHashSet()
> >>
>
>
>


Re: flink sql LATERAL TABLE

2020-11-09 文章 Danny Chan
可以提供详细信息,比如 sql 语句是啥 报错堆栈给出来

赵帅  于2020年11月10日周二 下午2:48写道:

> 请教一下,flink sql,lateral table如何配合insert overwrite使用,直接select不报错,但是一旦insert
> overwrite就报错


Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象

2020-11-09 文章 Danny Chan
拿 BatchTableEnvironment 对象作什么用处呢 ? 我们有 TableEnvironmentInternal 但是不推荐使用。

Asahi Lee <978466...@qq.com> 于2020年11月9日周一 下午5:09写道:

> 是的,BatchTableEnvironment 对象
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danny0...@apache.org;
> 发送时间:2020年11月9日(星期一) 中午12:34
> 收件人:"user-zh"
> 主题:Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象
>
>
>
> 
>  BatchTableEnvironment 环境
>
>
> 是说 BatchTableEnvironment 对象吗
>
> Asahi Lee <978466...@qq.com 于2020年11月9日周一 上午10:48写道:
>
>  你好!
>  nbsp; nbsp; nbsp; 我使用的是flink
> 1.11.2版本,官网的文档中说明blink的batch执行环境以如下方式获取:
>  // ** // BLINK BATCH QUERY // **
> import
>  org.apache.flink.table.api.EnvironmentSettings; import
>  org.apache.flink.table.api.TableEnvironment; EnvironmentSettings
> bbSettings
>  =
> 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>  TableEnvironment bbTableEnv =
> 
> TableEnvironment.create(bbSettings);除过上述的方法之外,是否还有其他方式获取到blink的batch执行环境?而我需要的是BatchTableEnvironment环境,该如何获取?


Re: flink 1.11.2 如何获取blink计划下的BatchTableEnvironment对象

2020-11-08 文章 Danny Chan
>
> BatchTableEnvironment 环境


是说  BatchTableEnvironment 对象吗

Asahi Lee <978466...@qq.com> 于2020年11月9日周一 上午10:48写道:

> 你好!
>我使用的是flink 1.11.2版本,官网的文档中说明blink的batch执行环境以如下方式获取:
> // ** // BLINK BATCH QUERY // ** import
> org.apache.flink.table.api.EnvironmentSettings; import
> org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings
> =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment bbTableEnv =
> TableEnvironment.create(bbSettings);除过上述的方法之外,是否还有其他方式获取到blink的batch执行环境?而我需要的是BatchTableEnvironment环境,该如何获取?


Re: flink sql kafka connector with avro confluent schema registry support

2020-11-08 文章 Danny Chan
支持的,参考 code https://github.com/apache/flink/pull/12919/commits

陈帅  于2020年11月3日周二 上午8:44写道:

> flink sql 1.11.2 支持 confluent schema registry 下 avro格式的kafka connector吗?
> 官网没找到相关资料。有的话请告知或者提供一下示例,谢谢!
>


Re: flinksql 不支持 % 运算

2020-10-28 文章 Danny Chan
%是非标准的 SQL 语法,不推荐使用。

Benchao Li  于2020年10月26日周一 下午9:26写道:

> 1.11的话通过配置是无法实现的。可以把这个pr[1] cherry-pick到1.11的分支上编译一下来实现1.11上使用%
>
> [1] https://github.com/apache/flink/pull/12818
>
> 夜思流年梦  于2020年10月26日周一 下午4:16写道:
>
> > flink 版本1.11
> > 目前flink-sql 好像不支持取余运算,会报错:
> > 比如:SELECT * FROM Orders WHERE a % 2 = 0
> > Percent remainder '%' is not allowed under the current SQL conformance
> > level
> >
> >
> > 看了下flink 的issue ,已经有人碰到过了,说是要1.12版本修复
> >
> >
> >
> >
> > 想问下:如果再1.11版本,flink-sql 要怎么操作才能支持 % 运算呢? 可以通过修改配置文件来实现么?比如flink-conf.yaml
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: 请问批处理有反压嘛?

2020-10-28 文章 Danny Chan
有的,反压机制借助于 runtime 的网络 buffer,和批流无关。

请叫我雷锋 <854194...@qq.com> 于2020年10月27日周二 下午8:02写道:

> 如题


Re: 求教,如果想要从flinksql中提取出列级别的血缘关系,请问有什么好的方案吗

2020-10-22 文章 Danny Chan
你可以了解下 Calcite 的 metadata 系统,其中有一个 metadata: RelMdColumnOrigins 可以拿到 column 
的血缘,前提是你要拿到 SQL 对的关系表达式树。

Best,
Danny Chan
在 2020年10月20日 +0800 PM8:43,dawangli ,写道:
> 求教,如果想要从flinksql中提取出列级别的血缘关系,请问有什么好的方案吗


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 文章 Danny Chan
应该是碰到节点 cycle 引用了,导致优化 rule 一直重复重复触发,可以将 debug 日志打开,看下是哪个 rule 
被频繁触发了,之前修过一个类似的问题[1],可以参考下

[1] https://issues.apache.org/jira/browse/CALCITE-3121

Best,
Danny Chan
在 2020年9月23日 +0800 AM10:23,jun su ,写道:
> hi godfrey,
> 方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的
>
> godfrey he  于2020年9月23日周三 上午10:09写道:
>
> > Hi Jun,
> >
> > 可能是old planner缺少一些rule导致遇到了corner case,
> > blink planner之前解过一些类似的案例。
> >
> > jun su  于2020年9月23日周三 上午9:53写道:
> >
> > > hi godfrey,
> > >
> > > 刚看了下, blink应该也会用hep , 上文说错了
> > >
> > > jun su  于2020年9月23日周三 上午9:19写道:
> > >
> > > > hi godfrey,
> > > > 我用了最新代码的blink没这个问题, 我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> > > >
> > > > godfrey he  于2020年9月22日周二 下午8:58写道:
> > > >
> > > > > blink planner 有这个问题吗?
> > > > >
> > > > > jun su  于2020年9月22日周二 下午3:27写道:
> > > > >
> > > > > > hi all,
> > > > > >
> > > > > > 环境: flink-1.9.2 flink table planner
> > > > > > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> > > > > >
> > > > > > 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> > > 导致进程OOM
> > > > > > ---
> > > > > > 代码:
> > > > > >
> > > > > > fbTableEnv.registerTableSource("source",orcTableSource)
> > > > > >
> > > > > > val select = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source
> > ")
> > > > > >
> > > > > > fbTableEnv.registerTable("selectTable",select)
> > > > > >
> > > > > > val t1 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> > > selectTable
> > > > > > where Auth_Roles like 'a%'")
> > > > > > fbTableEnv.registerTable("t1",t1)
> > > > > >
> > > > > > val t2 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> > > > > > Target_UserSid= 'b'")
> > > > > > fbTableEnv.registerTable("t2",t2)
> > > > > >
> > > > > > val t3 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> > > > > > Thread_ID= 'c'")
> > > > > > fbTableEnv.registerTable("t3",t3)
> > > > > >
> > > > > > val t4 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> > > > > > access_path= 'd'")
> > > > > > fbTableEnv.registerTable("t4",t4)
> > > > > >
> > > > > > val t5 = fbTableEnv.sqlQuery("select
> > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> > > > > > action= 'e'")
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best,
> > > > > > Jun Su
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Jun Su
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
>
>
> --
> Best,
> Jun Su


Re: Row和RowData的区别

2020-09-09 文章 Danny Chan
Row 是暴露给 DataStream 用户用的,里面可以设置 RowKind,RowData 是 Table 内部的数据结构,在一些场景序列化会有提升,使用 
Flink SQL 会直接应用上 RowData,当然高级用户想直接用 RowData 也是可以的,1.11 的新版 connector API 就是将 
RowData 暴露给了 connector 开发者。

Best,
Danny Chan



> 在 2020年9月9日,下午1:51,刘首维  写道:
> 
> Hi all,
> 
>
> 请问`org.apache.flink.types.Row`和`org.apache.flink.table.data.RowData`的区别和联系是?



Re: 消费kafka数据乱序问题

2020-09-07 文章 Danny Chan
你的 source 消费单/多 partition 数据相对 partition 来说仍然是有序的 只是 source 和下游 operator 如果存在数据 
shuffle 就会破坏顺序,目前想保序,一种办法是 source 的并发和下游保持一致。

Best,
Danny Chan
在 2020年9月4日 +0800 PM4:40,smq <374060...@qq.com>,写道:
> 大家好
>  
> 现在碰到一个关于处理乱序的问题,业务场景是银行余额的更新,数据源是kafka,有一个账户余额字段,sink到kudu,更新客户余额.
> 如果对于同一个账户的多笔操作出现乱序,可能会导致客户余额不对。比如一个客户账户有100块,先消费100,然后存100,按照正常的数据处理顺序,帐户余额应该还是100的,假如说先处理了存钱,存上之后余额是100-100+100=100,然后处理消费100的那条数据,由于消费在前,这时这条数据对应的余额是100-100=0。
>  这样的话就出现了统计余额错误,请问想要按照事件时间处理数据是不是可以先keyby,然后用watermark.


Re: flink sql多层view嵌套,字段not found

2020-09-03 文章 Danny Chan
这是一个已知问题,社区版本已经修复了 [1],不过还有一个后续 PR https://github.com/apache/flink/pull/13293,待 
merge

[1] https://issues.apache.org/jira/browse/FLINK-18750

Best,
Danny Chan
在 2020年9月3日 +0800 PM6:41,Lin Hou ,写道:
> Hi,
>
> 请教一个通过sql-client提交flink sql遇到的关于嵌套view,当嵌套第二层时,查询时会报找不到字段的问题。
> 元数据已经建好,简述如下:
>
> 1.建嵌套的view:
>
> create temporary view temp_app_impression_5min as
> select
> argService as arg_service,
> timeLocal as time_local,
> mid as mid,
> vipruid as vipruid,
> activity as activity,
> LOWER(activityProperty) as activity_property
> from
> vipdwd.clean;
>
> create temporary view v1_1 as
> select
> get_json_object(activity_property, '$.page') as impression_page,
> SPLIT_INDEX(good, '_', 0) as sales_no,
> good,
> imp.*
> from
> temp_app_impression_5min as imp, lateral table (
> string_split(
> COALESCE(
> get_json_object(activity_property, '$.goodslist'),
> ''
> ),
> ','
> )
> ) as T(good)
> where
> activity = 'active';
>
> 2. 描述view:
>
> 两个view建好没有问题,
> 同时 desc v1_1 如下:
>
> Flink SQL> desc v1_1;root
> |-- impression_page: STRING
> |-- sales_no: VARCHAR(2000)
> |-- good: STRING
> |-- arg_service: STRING
> |-- time_local: BIGINT
> |-- mid: STRING
> |-- vipruid: BIGINT
> |-- activity: STRING
> |-- activity_property: STRING
>
> 可见字段activity_property存在于v1_1中,
>
> 3.执行:
>
> 但是当执行:
> select
> *
> from
> v1_1;
>
> 出错:[ERROR] Could not execute SQL statement. Reason:
>
> org.apache.calcite.sql.validate.SqlValidatorException: Column
> 'activity_property' not found in any table
>
> 一层view的时候查询没有问题,当嵌套view的时候就会报这个错,哪怕生成这个view的时候指定这个字段都不行:
>
> create temporary view v1_1 as
> select
> get_json_object(activity_property, '$.page') as impression_page,
> SPLIT_INDEX(good, '_', 0) as sales_no,
> good,
> imp.*,
>
> activity_property
>
> from
> temp_app_impression_5min as imp, lateral table (
> string_split(
> COALESCE(
> get_json_object(activity_property, '$.goodslist'),
> ''
> ),
> ','
> )
> ) as T(good)
> where
> activity = 'active';
>
>
> 请问有解决办法么,或者是我们用的方式不对。
>
>
> 完整调用栈日志:
>
> org.apache.flink.table.client.gateway.SqlExecutionException:
> Invalidate SQL statement.
> at 
> org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:99)
> ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> at 
> org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:90)
> ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> at 
> org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:257)
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> Caused by: org.apache.flink.table.api.ValidationException: SQL
> validation failed. From line 3, column 84 to line 3, column 102:
> Column 'activity_property' not found in any table
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
> ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204)
> ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> at org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52)
> ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> at 
> org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
> ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59)
> ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55)
> ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> ~[flink-table_2.11-1.11-SNAPSHOT.jar:1.1

Re: Flink SQL 任务乱码问题

2020-09-03 文章 Danny Chan
SQL 文本是什么编码 ?有尝试过 UTF8 编码 ?

Best,
Danny Chan
在 2020年9月3日 +0800 PM5:04,LakeShen ,写道:
> Hi 社区,
>
> 我的一个 Flink SQL 任务,版本为 1.10,代码中使用了中文值,具体如下:
>
> select xxx, case when a = 'a' then '你好' when a = 'b' then '你好呀' end as va
> from xxx ;
>
> 然后会把这个结果输出,最后发现 va 的值乱码了,也就是中文乱码。
>
> 目前有什么比较好的解决方法吗。
>
> Best,
> LakeShen


Re: flink-1.11 sql写ES6问题

2020-09-02 文章 Danny Chan
Es connector 的包放到哪个目录下了 ?

Best,
Danny Chan
在 2020年9月2日 +0800 PM3:38,酷酷的浑蛋 ,写道:
> Caused by: java.lang.ClassNotFoundException: 
> org.elasticsearch.client.RestClientBuilder
> Flink sql 写入ES:总是报上面的错误,我检查了依赖并没有冲突这个类啊,而且我解压了jar,里面是有这个类的啊
>
>
>


Re: 【闫云鹏】Flink sql 写入es实现object嵌套形式

2020-09-02 文章 Danny Chan
[
{
f:f
},{
g:g
}
]

可否用 Array> 来表达?

Best,
Danny Chan
在 2020年9月2日 +0800 PM3:54,user-zh@flink.apache.org,写道:
>
> [
> {
> f:f
> },{
> g:g
> }
> ]


Re: flink-1.11连接hive或filesystem问题

2020-09-01 文章 Danny Chan
1.  为了保证数据正确性 stream 写文件依赖了 checkpoint 机制,你可以将你的间隔时间和 checkpoint 时间保持一致
2. 按逗号分隔是说 CSV format ?

Best,
Danny Chan
在 2020年8月31日 +0800 PM8:53,酷酷的浑蛋 ,写道:
> 1. Create hive表(...)with(...)
> 我发现写入hive只能根据checkpoint去提交分区?可以按照文件大小或者间隔时间来生成吗?
>
>
> 2. Create table (connector=filesystem,format=json) with(…)
> 这种方式format只能等于json? 我怎么按照分隔符写入hdfs?


Re: flink1.11连接mysql问题

2020-08-30 文章 Danny Chan
这个问题已经有 issue 在追踪了 [1]

[1] https://issues.apache.org/jira/browse/FLINK-12494

Best,
Danny Chan
在 2020年8月28日 +0800 PM3:02,user-zh@flink.apache.org,写道:
>
> CommunicationsException


Re: flink1.11时间函数

2020-08-30 文章 Danny Chan
对应英文的 deterministic function 可以更好理解些 ~

Best,
Danny Chan
在 2020年8月29日 +0800 PM6:23,Dream-底限 ,写道:
> 哦哦,好吧,我昨天用NOW的时候直接报错告诉我这是个bug,让我提交issue,我以为这种标示的都是函数功能有问题的
>
> Benchao Li  于2020年8月28日周五 下午8:01写道:
>
> > 不确定的意思是,这个函数的返回值是动态的,每次调用返回可能不同。
> > 对应的是确定性函数,比如concat就是确定性函数,只要输入是一样的,它的返回值就永远都是一样的。
> > 这个函数是否是确定性的,会影响plan的过程,比如是否可以做express reduce,是否可以复用表达式结果等。
> >
> > Dream-底限  于2020年8月28日周五 下午2:50写道:
> >
> > > hi
> > >
> > > UNIX_TIMESTAMP()
> > >
> > > NOW()
> > >
> > > 我这面想使用flink的时间戳函数,但是看官方文档对这两个函数描述后面加了一个此功能不确定,这个此功能不确定指的是这两个时间函数不能用吗
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-30 文章 Danny Chan
能否提供下完整的 query,方便追踪和排查 ~

Best,
Danny Chan
在 2020年8月31日 +0800 AM10:58,zhuyuping <1050316...@qq.com>,写道:
> 同样出现了这个问题,SQL 使用中,请问是什么原因,翻转tumble窗口当使用mapview 进行操作时候,状态不断的增长
> 好像不能清理一样,因为正常的window 窗口 窗口结束后会清理状态,现在的情况是1秒的翻转tumble窗口,满满的从最开始的1m 过一个小时变成了1g
> 不断的无限增长下去
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL 有提供参数支持大小写不敏感吗?

2020-08-27 文章 Danny Chan
Hi ~

要开启大小写不敏感涉及的东西比较多,例如词法解析,catalog 以及部分访问表达式 (a.b.c 或者 a[‘f0’]),社区已经有 issue 跟进了 
[1],预期在 1.12 版本可以解决。

[1] https://issues.apache.org/jira/browse/FLINK-16175

Best,
Danny Chan
在 2020年8月27日 +0800 PM3:52,Tianwang Li ,写道:
> 我们对用户在使用习惯了Hive之后,在写一些flink sql对时候经常碰到大小写对困扰。
> 使用对是默认对catalog。
>
> ```
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column
> 'uid' not found in any table; did you mean 'Uid'?
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> ... 28 more
>
> ```
>
>
>
> --
> **
> tivanli
> **


[需求调研] Stream SQL window join 支持

2020-08-27 文章 Danny Chan
大家好 ~

这里做一个 window-join[1] 的需求调研, window-join 是 Flink DataStream 上已经有的 feature. 
目标是决策是否要在 SQL 上支持该特性, 例如, tumbling window join 语法可能长这样:

```sql
select ... window_start, window_end
from TABLE(
  TUMBLE(
    DATA => TABLE table_a,
    TIMECOL => DESCRIPTOR(rowtime),
    SIZE => INTERVAL '1' MINUTE)) tumble_a
  [LEFT | RIGHT | FULL OUTER] JOIN TABLE(
  TUMBLE(
    DATA => TABLE table_b,
    TIMECOL => DESCRIPTOR(rowtime),
    SIZE => INTERVAL '1' MINUTE)) tumble_b
on tumble_a.col1 = tumble_b.col1 and ...
```

目前了解到的情况是一些公司 interval join 用的比较多,window join 的 case 还是比较少的, 希望这里可以有更多的反馈。

希望您可以分享 window join 的一些使用案例,以及为什么选用 window-join (而不是 interval join)。

感谢 ~

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html

Best,
Danny Chan


Re: DDL中声明主键会报类型不匹配

2020-08-26 文章 Danny Chan
是的 加了 primary key constraint 后会强制将类型转成 Not nullable,这个是 primary key 的特性导致的。

Best,
Danny Chan
在 2020年8月20日 +0800 PM5:19,xiao cai ,写道:
> Hi:
> flink版本1.11.0 connector为kafka
> DDL中声明某个字段为primary key时,会报类型不匹配,去掉primary key constraint就可以正常执行。
> 把shop_id设置为 varchar not null也不行。
>
>
> org.apache.flink.table.api.ValidationException: Type STRING NOT NULL of table 
> field 'shop_id' does not match with the physical type STRING of the 'shop_id' 
> field of the TableSource return type.
>
>
> SQL如下:
> create table source_0 (
> `shop_id` varchar,
> `user_id` bigint,
> `category_id` int,
> `ts` bigint,
> `proc_time` as PROCTIME(),
> `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, '-MM-dd 
> HH:mm:ss')),
> watermark for event_time AS event_time,
> PRIMARY KEY (shop_id, user_id) NOT ENFORCED
> ) with (
> 'connector.type' = 'kafka',
>
>
> )


Re: Flink SQL Map类型字段大小写不敏感支持

2020-08-26 文章 Danny Chan
您好 现在 Flink SQL 是大小写敏感的 目前还没有计划开启大小写不敏感。

Best,
Danny Chan
在 2020年8月21日 +0800 AM11:04,zilong xiao ,写道:
> 如题,在业务中有遇到过在Map类型字段中有表示同一个含义但是大小写不一致的情况,比如有个Map字段 my_map,里面可能存在key
> aB,ab,Ab,在SQL中取值时能否能做到大小写不敏感呢,my_map['ab'] 能取所有相关key的值


Re: Table api son schema

2020-08-11 文章 Danny Chan
Flink 1.11 支持 RAW 作为 sql 类型,基于此,你可以自己扩展 SE/DE 的逻辑实现部分动态解析[1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw

Best,
Danny Chan
在 2020年8月11日 +0800 PM3:40,Zhao,Yi(SEC) ,写道:
> <>
>
>
> 如上图,field api被标注过期。替换写法被注释掉,使用注视掉的写法会报错如下。
>
> Exception in thread "main" org.apache.flink.table.api.TableException: A raw 
> type backed by type information has no serializable string representation. It 
> needs to be resolved into a proper raw type.
>    at 
> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:97)
>    at org.apache.flink.table.descriptors.Schema.field(Schema.java:88)
>    at jobs.IpGapUserFt2.main(IpGapUserFt2.java:83)
>
> 不清楚有啥解决方法吗?
>
>
> 其次,我这边鼓捣了半天,发现使用json schema貌似也没办法实现整个表的动态结构。
> 我业务中json实际如下:
> {
> “d”:{
>   “key”: value
> …. … .  .. ..// 此处key动态扩展
> }
> }
> 我大概想法是d作为一个field,类型是map(但好像不支持map?必须用row)。用row呢,又必须指定其所有field,就不是动态扩展字段了。


Re: 关于FlinkSQL的文档中flinksql的connector和catalog划分以及hive catalog和hive connector是否必须配合使用的问题。

2020-08-10 文章 Danny Chan
你好 ~

1. 你是只文档结构吗 ?catalog 是 flink SQL 管理表元数据信息的组件,通过注册 catalog 用户可以直接访问 catalog 
中的已存表,当然用户也可以通过 CREATE TABLE DDL 来创建对应的 connector 表
2. 访问 hive metastore 中的表示一定要用 hive catalog 的,如果是新建临时表(不持久化),也可以使用内置的 catalog

Best,
Danny Chan
在 2020年8月10日 +0800 PM8:14,Zhao,Yi(SEC) ,写道:
> 1 为什么flinksql 1.11中,JDBC 
> Catalog通过简单的链接转给了connector,catalog和connector并不是同一个概念。我认为应该将jdbc 
> connectior和jdbc catalog分开放入各自目录。
>
> 2 为什么flinksql1.11中,connector部分没有hive connector。而是在hive 
> integration部分,以及catalogs中介绍。而且在 Table API & SQL/Hive Integration/Hive Read & 
> Write 部分,第一句是“Using the HiveCatalog and Flink’s connector to Hive, Flink can 
> read and write from Hive data as an alternative to Hive’s batch 
> engine.”。难道不使用hivecatalog就不能读取hive数据?是这个意思嘛。感觉好像意思是hive读取特别,这种情况必须使用hive 
> catalog。不可以使用jdbc catalog,但使用hive connector嘛?