Re: Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误

2022-05-27 文章 godfrey he
使用了什么state backend?能描述一下产生上述问题的步骤吗?
是直接跑作业就产生上述错误,还是作业有基于sp启动,或者是中途重启过?

zhangbin  于2022年5月27日周五 13:34写道:

>
> Retry
>  回复的原邮件 
> 发件人 zhangbin 
> 发送日期 2022年05月27日 10:11
> 收件人 godfre...@gmail.com 
> 抄送人 user-zh 
> 主题 回复:Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误
> 确认了下邮件的内容的确是异常信息,这里却把附件的内容展示出来了。我再重新发一次。
>
> 我们在使用Flink-1.15.0 cumulate window + grouping sets 执行SQL过程中,发现个问题,报如下错误:
>java.lang.IllegalArgumentException: key group from 93 to 95 does not
> contain 478
>at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>at
> org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl.registerEventTimeWindowTimer(WindowTimerServiceImpl.java:61)
>at
> org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl.registerEventTimeWindowTimer(WindowTimerServiceImpl.java:27)
>at
> org.apache.flink.table.runtime.operators.aggregate.window.processors.AbstractWindowAggProcessor.processElement(AbstractWindowAggProcessor.java:164)
>at
> org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator.processElement(SlicingWindowOperator.java:221)
>at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>at java.lang.Thread.run(Thread.java:748)
>
>


Re: Flink-1.15.0 在使用cumulate window报key group 不在范围内的错误

2022-05-26 文章 godfrey he
把异常栈也发出来吧

zhangbin  于2022年5月26日周四 22:54写道:


Re: 1.13.5版本sql大小64k限制bug

2022-05-26 文章 godfrey he
确认一下是sql文本超过64k?具体的异常是什么?

Yun Tang  于2022年5月26日周四 10:06写道:
>
> Hi
>
> 请使用英文在dev社区发送邮件。另外关于使用方面的问题,建议向user-zh 频道发送,已经帮你转发到相关邮件列表了。
>
>
> 祝好
> 唐云
> 
> From: Lose control ./ <286296...@qq.com.INVALID>
> Sent: Tuesday, May 24, 2022 9:15
> To: dev 
> Subject: 1.13.5版本sql大小64k限制bug
>
>  请问各位大神,1.13.5版本sql大小64k限制如何修改啊?谢谢


Re: flink创建视图后,SELECT语句后使用OPTIONS报错

2022-02-17 文章 godfrey he
这个包:flink-table_2.11-1.13.0.jar (legacy planner 的包)

Best,
Godfrey

liangjinghong  于2022年2月17日周四 09:59写道:
>
> 感谢老师的回复,然而我的部署环境下的lib中没有您说的这个包,请问是要移除哪个包呢?
>
> 我的lib下有的包:
> flink-csv-1.13.0.jar
> flink-dist_2.11-1.13.0.jar
> flink-json-1.13.0.jar
> flink-shaded-zookeeper-3.4.14.jar
> flink-sql-connector-mysql-cdc-2.1.1.jar
> flink-table_2.11-1.13.0.jar
> flink-table-blink_2.11-1.13.0.jar
> log4j-1.2-api-2.12.1.jar
> log4j-api-2.12.1.jar
> log4j-core-2.12.1.jar
> log4j-slf4j-impl-2.12.1.jar
> -邮件原件-
> 发件人: godfrey he [mailto:godfre...@gmail.com]
> 发送时间: 2022年2月16日 16:47
> 收件人: user-zh 
> 主题: Re: flink创建视图后,SELECT语句后使用OPTIONS报错
>
> Hi liangjinghong,
>
> 原因是 blink planner 中引入并修改了 SqlTableRef 类, 而 Legacy planner 中没有引入 SqlTableRef
> 类,从而导致加载到了Calcite 中 SqlTableRef (该类有问题)。
> 解决方案:如果只使用到了blink planner,可以把legacy planner 的包冲lib下移除。
>
> Best,
> Godfrey
>
> liangjinghong  于2022年2月14日周一 17:26写道:
>
> > 各位老师们好,以下代码在开发环境中可以执行,打包部署后报错:
> >
> > 代码:
> >
> > CREATE VIEW used_num_common
> >
> > (toolName,region,type,flavor,used_num)
> >
> > AS
> >
> > select info.toolName as toolName,r.regionName as
> > region,f.type,f.flavor,count(1) as used_num from
> >
> > tbl_schedule_job/*+ OPTIONS('server-id'='1001-1031') */ job
> >
> > join
> >
> > tbl_schedule_task/*+ OPTIONS('server-id'='2001-2031') */ task
> >
> > on job.jobId = task.jobId
> >
> > join
> >
> > tbl_broker_node/*+ OPTIONS('server-id'='3001-3031') */  node
> >
> > on task.nodeId = node.id
> >
> > join
> >
> > tbl_app_info/*+ OPTIONS('server-id'='4001-4031') */ info
> >
> > on job.appId = info.appId
> >
> > join
> >
> > tbl_region r
> >
> > on node.region/*+ OPTIONS('server-id'='5001-5031') */ = r.region
> >
> > join
> >
> > tbl_flavor/*+ OPTIONS('server-id'='6001-6031') */  f
> >
> > on node.resourcesSpec = f.flavor
> >
> > where job.jobStatus in ('RUNNING','ERROR','INITING')
> >
> > and task.taskStatus in ('RUNNING','ERROR','INITING')
> >
> > and node.machineStatus <> 'DELETED'
> >
> > and toolName is not null
> >
> > group by info.toolName,r.regionName,f.type,f.flavor
> >
> > …
> >
> > 打包部署后报错如下:
> >
> > The main method caused an error: class org.apache.calcite.sql.SqlSyntax$6:
> > SPECIAL
> >
> > 2022-02-08 13:33:39,350 WARN
> > org.apache.flink.client.deployment.application.DetachedApplicationRunn
> > er []
> > - Could not execute application:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error: class org.apache.calcite.sql.SqlSyntax$6:
> > SPECIAL
> >
> > at
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(Package
> > dProgram.java:372)
> > ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> >
> > at
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeF
> > orExecution(PackagedProgram.java:222)
> > ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> >
> > at
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:11
> > 4)
> > ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> >
> > at
> > org.apache.flink.client.deployment.application.DetachedApplicationRunn
> > er.tryExecuteJobs(DetachedApplicationRunner.java:84)
> > ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> >
> > at
> > org.apache.flink.client.deployment.application.DetachedApplicationRunn
> > er.run(DetachedApplicationRunner.java:70)
> > ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> >
> > at
> > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$hand
> > leRequest$0(JarRunHandler.java:102)
> > ~[flink-dist_2.11-1.13.0.jar:1.13.0]
> >
> > at
> > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFutu
> > re.java:1700)
> > [?:?]
> >
> > at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515
> > )
> > [?:?]
> >
> > at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
> >
> > at
> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.r
> > un(ScheduledThreadPoolExecutor.java:304)
> > [?:?]
> >
> > at
> > java.util.concurrent.Thre

Re: flink创建视图后,SELECT语句后使用OPTIONS报错

2022-02-16 文章 godfrey he
Hi liangjinghong,

原因是 blink planner 中引入并修改了 SqlTableRef 类, 而 Legacy planner 中没有引入 SqlTableRef
类,从而导致加载到了Calcite 中 SqlTableRef (该类有问题)。
解决方案:如果只使用到了blink planner,可以把legacy planner 的包冲lib下移除。

Best,
Godfrey

liangjinghong  于2022年2月14日周一 17:26写道:

> 各位老师们好,以下代码在开发环境中可以执行,打包部署后报错:
>
> 代码:
>
> CREATE VIEW used_num_common
>
> (toolName,region,type,flavor,used_num)
>
> AS
>
> select info.toolName as toolName,r.regionName as
> region,f.type,f.flavor,count(1) as used_num from
>
> tbl_schedule_job/*+ OPTIONS('server-id'='1001-1031') */ job
>
> join
>
> tbl_schedule_task/*+ OPTIONS('server-id'='2001-2031') */ task
>
> on job.jobId = task.jobId
>
> join
>
> tbl_broker_node/*+ OPTIONS('server-id'='3001-3031') */  node
>
> on task.nodeId = node.id
>
> join
>
> tbl_app_info/*+ OPTIONS('server-id'='4001-4031') */ info
>
> on job.appId = info.appId
>
> join
>
> tbl_region r
>
> on node.region/*+ OPTIONS('server-id'='5001-5031') */ = r.region
>
> join
>
> tbl_flavor/*+ OPTIONS('server-id'='6001-6031') */  f
>
> on node.resourcesSpec = f.flavor
>
> where job.jobStatus in ('RUNNING','ERROR','INITING')
>
> and task.taskStatus in ('RUNNING','ERROR','INITING')
>
> and node.machineStatus <> 'DELETED'
>
> and toolName is not null
>
> group by info.toolName,r.regionName,f.type,f.flavor
>
> …
>
> 打包部署后报错如下:
>
> The main method caused an error: class org.apache.calcite.sql.SqlSyntax$6:
> SPECIAL
>
> 2022-02-08 13:33:39,350 WARN
> org.apache.flink.client.deployment.application.DetachedApplicationRunner []
> - Could not execute application:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: class org.apache.calcite.sql.SqlSyntax$6: SPECIAL
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
> ~[flink-dist_2.11-1.13.0.jar:1.13.0]
>
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> [?:?]
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> [?:?]
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
>
> at java.lang.Thread.run(Thread.java:829) [?:?]
>
> Caused by: java.lang.UnsupportedOperationException: class
> org.apache.calcite.sql.SqlSyntax$6: SPECIAL
>
> at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlSyntax$6.unparse(SqlSyntax.java:116)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlOperator.unparse(SqlOperator.java:329)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:101)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at
> org.apache.calcite.sql.SqlJoin$SqlJoinOperator.unparse(SqlJoin.java:199)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlDialect.unparseCall(SqlDialect.java:453)
> ~[flink-table_2.11-1.13.0.jar:1.13.0]
>
> at org.apache.calcite.sql.SqlCall.unparse(SqlCall.java:104)
> ~[flink-table_2.11-1.13.0.jar:1.13

Re: sql-gateway和jdbc-driver还维护吗?

2022-01-11 文章 godfrey he
Hi Ada,

sql-gateway之前没有维护起来,确实是一个遗憾。
最近我们也关注到大家对batch的兴趣越来越浓,sql-gateway还会继续维护。

btw,非常欢迎分享一下你们使用Flink替换Spark遇到的一些痛点,我们会逐渐去解决这些痛点

Best,
Godfrey

Ada Wong  于2022年1月12日周三 10:09写道:
>
> cc tsreaper and Godfrey He
>
> 文末丶 <809097...@qq.com.invalid> 于2022年1月10日周一 19:39写道:
>
> >
> > 试下 https://github.com/DataLinkDC/dlink 看看能不能满足你的需求
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人:
> > "user-zh"   
> >  
> >  > 发送时间: 2022年1月10日(星期一) 晚上7:32
> > 收件人: "user-zh" >
> > 主题: Re: sql-gateway和jdbc-driver还维护吗?
> >
> >
> >
> > https://github.com/ververica/flink-jdbc-driver
> > https://github.com/ververica/flink-sql-gateway
> >
> > Ada Wong  > >
> > > 我看这俩项目一两年没更新了。想用Flink彻底替换到Spark,这俩项目是刚需,用来替换SparkThriftServer。


Re: 创建表t1的视图v1之后rowtime属性丢失

2021-11-02 文章 godfrey he
可以把具体的sql发出来看看

yidan zhao  于2021年11月2日周二 下午7:06写道:
>
> 如题,我原先基于flink1.11和1.12貌似没这个问题。目前基于1.13出现这个问题。
> 问题描述如下:
> 我t1是kafka表,其中有个属性是event_time是row time属性,然后创建了view v1,通过select  ,
> event_time from t1这样创建。  现在问题是这么创建之后,我基于v1查询报错说aggre.. window只能在time
> attributes上定义。
> 不清楚是版本变化导致,还是我其他地方搞错了呢。


Re: Re: Re: 公司数据密文,实现group by和join

2021-11-01 文章 godfrey he
上传的图片没法显示,通过图床工具或纯文本方式重新发一遍

lyh1067341434  于2021年11月1日周一 上午10:42写道:

> 您好!
>
> 这样好像还是不行,因为group by id ,id还是密文字符串,还是会把id当成字符串处理,所以还是不能正确分组;
> 为了更清楚表达,下面为图示:
>
> 谢谢您!
>
>
>
>
>
>
>
> 在 2021-10-29 10:49:35,"Caizhi Weng"  写道:
> >Hi!
> >
> >你是不是想写这样的 SQL:
> >
> >SELECT id, sum(price) AS total_price FROM (
> >  SELECT T1.id AS id, T2.price AS price FROM T AS T1 INNER JOIN T AS T2 ON
> >decrypt_udf(T1.id, T2.id) = 1
> >) GROUP BY id
> >
> >这个 sql 会输出每个 id 和该 id 属于的分组的总价格。
> >
> >lyh1067341434  于2021年10月29日周五 上午9:41写道:
> >
> >> 您好!
> >>
> >>
> >>   感谢您在百忙之中抽空回复我的邮件,我已经按照您的建议,自定义join函数实现了密文的join,但密文的group by 还是实现不了;
> >>
> >>
> >> 比如 有一张表 a, 表a有
> >> id,price列,数据都是密文,类似这样("MBwEELdR0JDC0OSryuQskeulP8YCCAyJLH7RwmAA");
> >>
> >>
> >> 如果我想求 不同id组的price之和:
> >> 直接使用flink 计算:会把id的分组当成字符串处理,从而导致分组的不正确;
> >> 如果调用密文计算的接口的话,把两个比较的key的密文传进入,会得到1或者0,来判断这两个密文key是否相等,从而分组可以正确;
> >>
> >>
> >>
> >>
> >> 问题:
> >>
> >>
> >> 目前group by分组,不知道在哪里实现调用密文计算的接口,从而传入两个key,来进行分组正确;
> >>我看到api只能指定分组的key是哪一个;
> >>
> >>
> >> 谢谢您!
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2021-10-28 11:09:26,"Caizhi Weng"  写道:
> >> >Hi!
> >> >
> >> >没太明白你的需求。你的需求是不是
> >> >
> >> >1. 调用一个自定义函数,用某一列密文算出一个值 k,用这个 k 作为 join key 或者 group key。
> >> >
> >> >如果是这个需求,只要实现一个 udf 即可。详见 [1]。
> >> >
> >> >2. 调用一个自定义函数,用某两列密文算出一个 true 或 false,如果是 true 说明 join key 匹配。
> >> >
> >> >如果是这个需求,仍然只需要实现一个 udf。join 条件中调用这个 udf 即可。但如果是这个需求,不太明白你期望中的 group by
> >> >是什么样的,因为不能仅通过 true false 就判断哪些数据属于同一个 group。
> >> >
> >> >[1]
> >> >
> >> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/functions/udfs/
> >> >
> >> >lyh1067341...@163.com  于2021年10月27日周三 下午5:20写道:
> >> >
> >> >> 您好:
> >> >>
> >> 目前公司数据都是密文,要进行密文数据的比较或者计算的话,只能调用公司密文计算的接口,去看了下flink的分组和join算子,都只能指定分组的key或者join的key,不知道怎么改写比较的规则,我用mapreduce实现了重写shuffle的比较规则,可以实现密文下的join和group
> >> >> by,对于使用spark和flink算子不知道如何实现。
> >> >>
> >> >> 问题:
> >> >> 请问有啥办法,实现密文下的join和group by操作吗?(在不能解密,只能调用公司密文计算的接口)
> >> >>
> >> >> 谢谢您。
> >> >>
> >> >>
> >> >>
> >> >> 发自 网易邮箱大师
> >>
>
>
>
>
>


Re: k8s session模式SQLclient怎样连接

2021-07-23 文章 godfrey he
我建了一个jira,建议sql client把作业提交到各种集群的方式在文档里写清楚,
可以关注 https://issues.apache.org/jira/browse/FLINK-23483

Best,
Godfrey


Caizhi Weng  于2021年7月23日周五 上午10:12写道:

> Hi!
>
> 可以考虑把 k8s session 的 flink rest api 地址暴露出来,然后客户端把 execution.target 设为
> remote,rest.address 和 rest.port 设为相应地址。
>
> maker_d...@foxmail.com  于2021年7月22日周四 下午9:46写道:
>
> > 大家好,
> > 我将flink部署在k8s集群上,使用官方文档上的session模式进行部署,可以正常提交任务。
> > 现在我想使用sqlclient,在提交任务时提示 :
> > [ERROR] Could not execute SQL statement. Reason:
> > java.net.UnknownHostException: flink-cluster
> > 请问大家,如何使用sqlclient连接k8s上的flink session。
> > flink版本 1.12.4.
> >
> >
> >
> > maker_d...@foxmail.com
> >
>


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 文章 godfrey he
例如 calc merge rule,还有calc,agg等其他相关rule,点比较散。得具体看

jun su  于2020年9月23日周三 上午10:22写道:

> hi godfrey,
> 方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的
>
> godfrey he  于2020年9月23日周三 上午10:09写道:
>
> > Hi Jun,
> >
> > 可能是old planner缺少一些rule导致遇到了corner case,
> > blink planner之前解过一些类似的案例。
> >
> > jun su  于2020年9月23日周三 上午9:53写道:
> >
> > > hi godfrey,
> > >
> > > 刚看了下, blink应该也会用hep , 上文说错了
> > >
> > > jun su  于2020年9月23日周三 上午9:19写道:
> > >
> > > > hi godfrey,
> > > > 我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> > > >
> > > > godfrey he  于2020年9月22日周二 下午8:58写道:
> > > >
> > > >> blink planner 有这个问题吗?
> > > >>
> > > >> jun su  于2020年9月22日周二 下午3:27写道:
> > > >>
> > > >> > hi all,
> > > >> >
> > > >> > 环境: flink-1.9.2 flink table planner
> > > >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> > > >> >
> > > >> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> > > 导致进程OOM
> > > >> > ---
> > > >> > 代码:
> > > >> >
> > > >> > fbTableEnv.registerTableSource("source",orcTableSource)
> > > >> >
> > > >> > val select = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source
> > ")
> > > >> >
> > > >> > fbTableEnv.registerTable("selectTable",select)
> > > >> >
> > > >> > val t1 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> > > selectTable
> > > >> > where Auth_Roles like 'a%'")
> > > >> > fbTableEnv.registerTable("t1",t1)
> > > >> >
> > > >> > val t2 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1
> where
> > > >> > Target_UserSid= 'b'")
> > > >> > fbTableEnv.registerTable("t2",t2)
> > > >> >
> > > >> > val t3 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2
> where
> > > >> > Thread_ID= 'c'")
> > > >> > fbTableEnv.registerTable("t3",t3)
> > > >> >
> > > >> > val t4 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3
> where
> > > >> > access_path= 'd'")
> > > >> > fbTableEnv.registerTable("t4",t4)
> > > >> >
> > > >> > val t5 = fbTableEnv.sqlQuery("select
> > > >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4
> where
> > > >> > action= 'e'")
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > Best,
> > > >> > Jun Su
> > > >> >
> > > >>
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Jun Su
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
>
>
> --
> Best,
> Jun Su
>


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 文章 godfrey he
Hi Jun,

可能是old planner缺少一些rule导致遇到了corner case,
blink planner之前解过一些类似的案例。

jun su  于2020年9月23日周三 上午9:53写道:

> hi godfrey,
>
> 刚看了下, blink应该也会用hep , 上文说错了
>
> jun su  于2020年9月23日周三 上午9:19写道:
>
> > hi godfrey,
> > 我用了最新代码的blink没这个问题,  我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> >
> > godfrey he  于2020年9月22日周二 下午8:58写道:
> >
> >> blink planner 有这个问题吗?
> >>
> >> jun su  于2020年9月22日周二 下午3:27写道:
> >>
> >> > hi all,
> >> >
> >> > 环境: flink-1.9.2 flink table planner
> >> > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> >> >
> >> >   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> 导致进程OOM
> >> > ---
> >> > 代码:
> >> >
> >> > fbTableEnv.registerTableSource("source",orcTableSource)
> >> >
> >> > val select = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")
> >> >
> >> > fbTableEnv.registerTable("selectTable",select)
> >> >
> >> > val t1 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> selectTable
> >> > where Auth_Roles like 'a%'")
> >> > fbTableEnv.registerTable("t1",t1)
> >> >
> >> > val t2 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> >> > Target_UserSid= 'b'")
> >> > fbTableEnv.registerTable("t2",t2)
> >> >
> >> > val t3 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> >> > Thread_ID= 'c'")
> >> > fbTableEnv.registerTable("t3",t3)
> >> >
> >> > val t4 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> >> > access_path= 'd'")
> >> > fbTableEnv.registerTable("t4",t4)
> >> >
> >> > val t5 = fbTableEnv.sqlQuery("select
> >> > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> >> > action= 'e'")
> >> >
> >> >
> >> >
> >> > --
> >> > Best,
> >> > Jun Su
> >> >
> >>
> >
> >
> > --
> > Best,
> > Jun Su
> >
>
>
> --
> Best,
> Jun Su
>


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-22 文章 godfrey he
blink planner 有这个问题吗?

jun su  于2020年9月22日周二 下午3:27写道:

> hi all,
>
> 环境: flink-1.9.2 flink table planner
> 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
>
>   发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别, 导致进程OOM
> ---
> 代码:
>
> fbTableEnv.registerTableSource("source",orcTableSource)
>
> val select = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from source ")
>
> fbTableEnv.registerTable("selectTable",select)
>
> val t1 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from selectTable
> where Auth_Roles like 'a%'")
> fbTableEnv.registerTable("t1",t1)
>
> val t2 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1 where
> Target_UserSid= 'b'")
> fbTableEnv.registerTable("t2",t2)
>
> val t3 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2 where
> Thread_ID= 'c'")
> fbTableEnv.registerTable("t3",t3)
>
> val t4 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3 where
> access_path= 'd'")
> fbTableEnv.registerTable("t4",t4)
>
> val t5 = fbTableEnv.sqlQuery("select
> Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4 where
> action= 'e'")
>
>
>
> --
> Best,
> Jun Su
>


Re: Flink 1.11 jdbc查pg失败

2020-09-17 文章 godfrey he
据我所知,目前flink是大小写不敏感,但是pg是大小写敏感。这问题暂时没法解

wdmcode  于2020年9月10日周四 上午9:44写道:

> Hi Jimmy
>
> 给字段加双引号试试呢
> Select “F1”,”F2” from xxx.xxx;
>
>
> 发件人: Jimmy Zhang
> 发送时间: Thursday, September 10, 2020 9:41 AM
> 收件人: user-zh@flink.apache.org
> 主题: Flink 1.11 jdbc查pg失败
>
> flink 1.11用jdbc查询pg表时,pg表的字段是大写 flink会把字段转成小写,而导致查询失败,有大佬知道这个问题吗
>
> Best,
> Jimmy Signature is customized by Netease Mail Master
>
>


Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.

2020-09-17 文章 godfrey he
能提供完整的demo吗?

me  于2020年9月11日周五 下午6:54写道:

> 1.flink 版本是1.11.1
> streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamBlinkSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> streamTableEnv = StreamTableEnvironment.create(streamEnv,
> streamBlinkSettings)
>
> 2.我在执行sql后需要转为datastream所以最后使用的是dataStreamEnv.execute("SqlPlatformRealTime”)
> sql的结果Table会转为datastream然后addSink保存到kafka中。
>
>
>  原始邮件
> 发件人: silence
> 收件人: user-zh
> 发送时间: 2020年9月11日(周五) 18:49
> 主题: Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming
> topology. Cannot execute.
>
>
> 没有insert语句也就是没有sink无法触发计算 -- Sent from:
> http://apache-flink.147419.n8.nabble.com/


Re: 多线程模式下使用Blink TableEnvironment

2020-09-17 文章 godfrey he
TableEnvironment 不是多线程安全的。

btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗?

Jeff Zhang  于2020年9月14日周一 下午12:10写道:

> 参考zeppelin的做法,每个线程里都调用这个
>
>
> https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111
>
>
> jun su  于2020年9月14日周一 上午11:54写道:
>
> > hi all,
> >
> > 多线程模式下执行sql , 在非聚合sql时报了如下错误:
> >
> > Caused by: java.lang.NullPointerException
> >   at java.util.Objects.requireNonNull(Objects.java:203)
> >   at
> >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> >   at
> >
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> >
> >
> >
> >
> 已经用RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))
> > 解决
> >
> >
> > 但是执行聚合sql时 , 仍然会报错, 请问有办法临时fix?
> >
> > Caused by: java.lang.NullPointerException
> > at scala.Predef$.Double2double(Predef.scala:365)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate.computeSelfCost(FlinkLogicalAggregate.scala:81)
> > at
> >
> >
> org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.getNonCumulativeCost(RelMdPercentageOriginalRows.java:174)
> > at
> > GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown
> > Source)
> > at
> GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown
> > Source)
> >
> > --
> > Best,
> > Jun Su
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: flink hive批量作业报FileNotFoundException

2020-09-17 文章 godfrey he
cc @Rui Li 

李佳宸  于2020年9月14日周一 下午5:11写道:

> 大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件
> 版本是1.11.1
> Caused by: java.io.FileNotFoundException: File
>
> hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144
> does not exist.
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
> ~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0]
> at
>
> org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> 在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题
>


Re: 关于flink cdc 测试时遇到的几种问题,比较疑惑,各位大佬求帮助

2020-09-17 文章 godfrey he
sql
client的默认并发为1,如果没有在sql-client-defaults.yaml显示设置parallelism,代码里面的默认并发为1.因此需要显示的设置
sql-client-defaults.yaml的parallelism

Jark Wu  于2020年9月15日周二 上午11:43写道:

> Hi,
>
> 请问
> 1. 有完整的异常栈吗? 你是怎么从 ck 恢复的呢? 用的什么命令?
> 2. 是的。因为 source 只能并发1。先写到 kafka,再从 kafka 同步是可以的。
>
> Best,
> Jark
>
> On Fri, 11 Sep 2020 at 17:56, 引领  wrote:
>
> >
> >
> > 1、在checkpoint后,用ck恢复时报错。
> > org.apache.kafka.connect.errors.ConnectException:
> >
> com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException:
> > Failed to deserialize data ofEventHeaderV4{timestamp=1599815908000,
> > eventType=EXT_UPDATE_ROWS, serverId=501, headerLength=19,
> dataLength=25879,
> > nextPosition=721073164, flags=0}
> > 2、关于flink cdc读取数据后,并执行join【加载维表的操作】后,写入mysql中。并发调不上去,一直是1
> > 我已在配置文件中做了相应的设置,包括sql-client中
> > taskmanager.numberOfTaskSlots: 5 # The parallelism used for
> > programs that did not specify and other parallelism.
> >  parallelism.default: 5
> >
> >
> > 我的sql是:
> >
> >
> > Insert into orders Select * from order o join sku s FOR SYSTEM_TIME
> as
> > of o.proc_time s  on o.sku_id = s.id
> >
> >
> > 提前感谢各位大佬回复
> >
> >
> >
> >
> >
> >
>


Re: Flink SQL create view问题

2020-09-17 文章 godfrey he
已知问题,已fix:https://issues.apache.org/jira/browse/FLINK-18750

guaishushu1...@163.com  于2020年9月16日周三 下午2:32写道:

> 当create_view和LATERAL TABLE 共用时 会出现字段找不到异常
>
> 语法:
> CREATE TABLE billing_data_test (
> message  STRING
>
>
> create view v1 as
> select T.*
> from billing_data_test,
> LATERAL TABLE(SplitUdtf(message)) as T(scate1,  scate2,  belong_local1,
> ssrc2,  gift,  coupon,  local_type);
>
> 异常:
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column
> 'message' not found in any table
> (com.dataplatform.flink.util.FlinkDebugThread)
> [2020-09-16 14:32:04,857] INFO ---  at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> (com.dataplatform.flink.util.FlinkDebugThread)
>
>
>
>
>
> guaishushu1...@163.com
>


Re: Flink SQL TableSource复用问题,相同数据源聚合多个指标,引擎创建多个相同的数据源

2020-09-17 文章 godfrey he
blink 根据每个算子的digest信息来判断是否可以reuse(只有digest完全一样才可以reuse),
例如table source节点,算子信息包括:表名,select的字段信息,其他push down之后的信息等。
你可以通过explain的方式把plan打印出来看看,source的digest是否一样

Jingsong Li  于2020年9月17日周四 下午2:45写道:

> 你仔细看看这两个数据源是不是有什么不同
> 只要有一点不同,Blink 就 reuse 不了
>
> On Thu, Sep 17, 2020 at 11:10 AM Kevin Dai <154434...@qq.com> wrote:
>
> > 场景描述:
> > 通过Flink SQL创建两个Kafka数据源,对数据源去重处理,然后Union ALL合并,并创建临时视图
> > 然后通过Flink SQL读取临时视图进行聚合计算指标,结果写入Redis
> > 问题描述:
> > Flink SQL 解析器会为每个聚合运算创建相同的两个数据源
> >
> > 在下面Blink Plan的配置说明中,引擎应该会优化复用相同的数据源
> > - table.optimizer.reuse-source-enabled
> > - table.optimizer.reuse-sub-plan-enabled
> >
> > 请问下,有人碰到类似问题么?
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> --
> Best, Jingsong Lee
>


Re: flink 1.9 关于回撤流的问题

2020-09-17 文章 godfrey he
可以用flink提供的“去重"语法来支持

[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D

Shengkai Fang  于2020年9月15日周二 下午4:02写道:

> hi, 我对于使用upsert
>
> kafka能够省state感到疑惑。金竹老师提供的实现只是丢掉了delete消息,你的下游表依旧需要手动去重才可以得到准确的结果才对啊。如果每个下游表都手动去重这样子还能省state吗?
>
> star <3149768...@qq.com> 于2020年6月8日周一 上午9:38写道:
>
> > 非常感谢,正是我想要的。也谢谢金竹老师的分享!
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: "Sun.Zhu"<17626017...@163.com>;
> > 发送时间: 2020年6月7日(星期天) 凌晨0:02
> > 收件人: "user-zh@flink.apache.org" > 抄送: "user-zh@flink.apache.org" > 主题: 回复:flink 1.9 关于回撤流的问题
> >
> >
> >
> > Hi,star
> > 金竹老师发过一篇文章,重写了KafkaConnector的实现,支持upsert模式,可以参考下[1]
> >
> >
> > [1]https://mp.weixin.qq.com/s/MSs7HSaegyWWU3Fig2PYYA
> > | |
> > Sun.Zhu
> > |
> > |
> > 17626017...@163.com
> > |
> > 签名由网易邮箱大师定制
> >
> >
> > 在2020年06月3日 14:47,star<3149768...@qq.com> 写道:
> > 大家好,
> >
> >
> >
> >
> 在做实时统计分析的时候我将基础汇总层做成了一个回撤流(toRetractStream)输出到kafka里(因为后面很多实时报表依赖这个流,所以就输出了)
> > 问题是这个kafka里到回撤流还能转成flink 的RetractStream流吗,转成后我想注册成一张表,然后基于这个表再做聚合分析
> >
> >
> >
> >
> > 谢谢
>


Re: flink-sql-gateway还会更新吗

2020-08-30 文章 godfrey he
已更新至flink1.11.1

godfrey he  于2020年8月24日周一 下午9:45写道:

> 我们会在这周让flink-sql-gateway支持1.11,请关注
> 另外,sql-client支持gateway模式,据我所知目前还没计划。
>
> shougou <80562...@qq.com> 于2020年8月24日周一 上午9:48写道:
>
>> 也有同样的问题,同时也问一下,sql client 计划在哪个版本支持gateway模式?多谢
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>


Re: flink1.11 sql api使用per job模式提交后,客户端退出

2020-08-24 文章 godfrey he
如果是通过TableEnvironment#execute方法提交需要设置execution.attached=true, 或者是通过flink
cli的 加上-d
如果是通过TableEnvironment#executeSql方法提交,需要代码里显示的等待作业结束:
TableResult tableResult = tEnv.executeSql(xxx);
// wait job finished
tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();

lijufeng2016 <920347...@qq.com> 于2020年8月25日周二 上午9:34写道:

> flink1.11 sql api使用per
> job模式提交后,客户端退出,程序在yarn正常运行,客户端与yarn断开连接,与有没有办法不让客户端断开?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-sql-gateway还会更新吗

2020-08-24 文章 godfrey he
我们会在这周让flink-sql-gateway支持1.11,请关注
另外,sql-client支持gateway模式,据我所知目前还没计划。

shougou <80562...@qq.com> 于2020年8月24日周一 上午9:48写道:

> 也有同样的问题,同时也问一下,sql client 计划在哪个版本支持gateway模式?多谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink log4j2 问题

2020-08-24 文章 godfrey he
检查一下log4j2相关的版本,参考[1]

[1]
https://stackoverflow.com/questions/50970960/facing-issue-with-log4j2-java-lang-exceptionininitializererror

guaishushu1...@163.com  于2020年8月24日周一 上午11:18写道:

> SQL提交会出现这种问题???
> Caused by: java.lang.IllegalArgumentException: Initial capacity must be at
> least one but was 0
> at
> org.apache.logging.log4j.util.SortedArrayStringMap.(SortedArrayStringMap.java:102)
> at
> org.apache.logging.log4j.core.impl.ContextDataFactory.createContextData(ContextDataFactory.java:109)
> at
> org.apache.logging.log4j.core.impl.ContextDataFactory.(ContextDataFactory.java:57)
> ... 29 more
>
>
>
> guaishushu1...@163.com
>


Re: ScalarFunction 访问 state

2020-08-18 文章 godfrey he
看看deduplication语法[1] 是否满足你的需求

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication

forideal  于2020年8月17日周一 下午12:13写道:

> Hi,
>
> 最近我有一个使用 Flink SQL 做简单的数据去重的需求,想使用 Flink 的 `ScalarFunction`,通过阅读
> API 发现 FunctionContext context 并不支持访问 state。
> 我准备使用 Guava cache 做,不知道小伙伴有没有更好的建议哈!感谢。
>
>
> Best,forideal


Re: flink 1.11 SQL idea调试无数据也无报错

2020-08-18 文章 godfrey he
> GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)
是要1个小时的window结束才会出结果。
你可以通过把window缩小或者设置early-fire来提早看到数据
table.exec.emit.early-fire.enabled=true
table.exec.emit.early-fire.delay=xx

>  手动拿到那个executeSql的返回的TableResult,然后去   wait job finished
这个是为了防止本地ide里执行时executeSql执行结束后进程退出导致job也强制结束

DanielGu <610493...@qq.com> 于2020年8月17日周一 下午4:04写道:

> hi,
> flink 1.11 SQL idea调试 有其他伙伴了解吗?求赐教.
> 最近调试卡在这里..有点出不来了
> 十分感谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: sql client 并行度问题

2020-08-18 文章 godfrey he
通过设置 table.exec.hive.infer-source-parallelism=false, 把hive source自动并发推导关闭,
然后设置table.exec.resource.default-parallelism修改并发度

18579099...@163.com <18579099...@163.com> 于2020年8月17日周一 下午5:51写道:

> sql client 读取hive表,hive表中一共有21个文件需要读取,sql
> client提示需要21个并行度,但是我slot并没有这么多。有什么办法可以把并行度改小?
>
>
>
> 18579099...@163.com
>


Re: flink 1.11.1 sql client 流式 join 出现预期之外的含 null 行

2020-08-13 文章 godfrey he
可以把原始的计算结果打印出来,执行 set execution.result-mode=changelog
(如果source有delete消息,可能会出现null值)


LittleFall <1578166...@qq.com> 于2020年8月13日周四 下午3:33写道:

> mysql 的建表语句
> use test;
> create table base (
> id int primary key,
> location varchar(20)
> );
> create table stuff(
> id int primary key,
> b_id int,
> name varchar(20)
> );
>
> flink sql client 的建表语句
> create table base (
> id int primary key,
> location varchar(20)
> )WITH (
>'connector' = 'kafka',
>'topic' = 'example',
>'properties.group.id' = 'testGroup',
>'scan.startup.mode' = 'latest-offset',
>'properties.bootstrap.servers' = 'localhost:9092',
>'format' = 'canal-json'
> );
> create table stuff(
> id int primary key,
> b_id int,
> name varchar(20)
> )WITH (
>'connector' = 'kafka',
>'topic' = 'example',
>'properties.group.id' = 'testGroup',
>'scan.startup.mode' = 'latest-offset',
>'properties.bootstrap.servers' = 'localhost:9092',
>'format' = 'canal-json'
> );
>
> flink 查询语句
> select distinct stuff.id s_id, base.id b_id, base.location, stuff.name
> from stuff inner join base
> on stuff.b_id = base.id;
>
> mysql 插入语句
> insert into base values (1, 'beijing');
> insert into stuff values (1, 1, 'zz');
>
> flink 结果
> <
> http://apache-flink.147419.n8.nabble.com/file/t858/2020-08-13_15-12-36_%E7%9A%84%E5%B1%8F%E5%B9%95%E6%88%AA%E5%9B%BE.png>
>
>
> mysql 执行同样的查询的结果:
> +--+--+--+--+
> | s_id | b_id | location | name |
> +--+--+--+--+
> |1 |1 | beijing  | zz   |
> +--+--+--+--+
> 1 row in set (0.01 sec)
>
>
> 而且有时候连结果正确的行都不会出现,只会出现含 null 的行。
>
> 求助大家。。。
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-13 文章 godfrey he
sql client 中通过 -j 或者 -l 的指定的包会被随着job提交的时候一起上传到jm。

Zhao,Yi(SEC)  于2020年8月13日周四 下午5:11写道:

> A是10机器集群(HA模式,独立集群),B作为提交机器。
> 从我实验效果来看,我是先启动一个sql-client的cli,如下命令:
> ./bin/sql-client.sh embedded -l $(pwd)/libs_sql -l $(pwd)/libs_udf
> 其中libs_sql中有:flink-connector-kafka_2.12-1.10.0.jar
> flink-connector-kafka-base_2.12-1.10.0.jar  flink-jdbc_2.12-1.10.0.jar
> flink-json-1.10.0.jar。然后A集群所有机器没有加这些包(Flink部署目录lib中没有)。A集群上其他任务提交的包中jar应该不致于会影响到我sql提交的任务。
>
> 结论是,我libs_sql中没有flink-json、flink-connector-kafka等的时候,提交sql任务会报错。加了的时候,提交sql任务不报错。
> 所以感觉貌似提交sql任务会将启动sql-client时候指定的lib相关包都上传吗?
> 
>
> 在 2020/8/13 下午3:10,“Jeff Zhang” 写入:
>
> 你的10台机器是flink standalone还是 yarn集群 ?
> 如果是yarn集群,那么只要把依赖放到机器A就可以了,如果是standalone的话,就要在B集群上部署了依赖了。
>
> 另外你可以用zeppelin来提交flink sql 作业,zeppelin提供很多种添加第三方依赖的方法,具体可以参考文档
> https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s
> 或者加入钉钉群讨论,钉钉群号: 32803524
>
>
> Zhao,Yi(SEC)  于2020年8月13日周四 下午1:02写道:
>
> > 背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
> > 现在比较混乱,哪些jar需要放到A,哪些放到B。
> >
> >
> > (1) kafka ssl
> >
> 证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。
> >
> > (2)
> >
> flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。
> >
> >
> >
> >
> 总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?
> >
> > 目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。
> >
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>


Re: flink 1.11 使用sql将流式数据写入hive

2020-08-11 文章 godfrey he
 tEnv.executeSql(insertSql); 是异步提交完任务就返回了,
如果是IDE里运行的话话,进程就直接退出导致job也就结束了。需要需要等到job结束,
目前可以通过下面这种方式
TableResult result = tEnv.executeSql(insertSql);
result..getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();

另外  tEnv.executeSql(insertSql); 已经提交作业了,不需要调用  bsEnv.execute("test");

liya...@huimin100.cn  于2020年8月11日周二 下午3:20写道:

> 下面粘的就是主程序代码
>
> 能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助
>
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);DataStream dataStream = bsEnv.addSource(new
> MySource());//构造hive catalog
> String name = "myhive";
> String defaultDatabase = "default";
> String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a
> local path
> String version = "1.1.0";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,
> version);
> tEnv.registerCatalog("myhive", hive);
> tEnv.useCatalog("myhive");
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> tEnv.createTemporaryView("users", dataStream);
>
> Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts,
> '-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM
> users");
>
>
> tEnv.toRetractStream(result3, TypeInformation.of(new
> TypeHint>(){})).print("res");//
> 如果hive中已经存在了相应的表,则这段代码省略
> //String hiveSql = "CREATE TABLE fs_table (\n" +
> // "  user_id STRING,\n" +
> // "  order_amount DOUBLE \n" +
> // ") partitioned by (dt string,h string,m string) \n"
> +
> // "stored as textfile \n" +
> // "TBLPROPERTIES (\n" +
> // "
> 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
> // "  'sink.partition-commit.delay'='0s',\n" +
> // "
> 'sink.partition-commit.trigger'='partition-time',\n" +
> // "  'sink.partition-commit.policy.kind'='metastore'"
> +
> // ")";
> //tEnv.executeSql(hiveSql);
>
> String insertSql = "insert into table fs_table partition (dt,h,m)
> SELECT userId, amount, DATE_FORMAT(ts, '-MM-dd') dt, DATE_FORMAT(ts,
> 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users";
>
> tEnv.executeSql(insertSql);
>
> bsEnv.execute("test");
>
>
> liya...@huimin100.cn
>


Re: flink sql状态清理问题

2020-08-09 文章 godfrey he
配置了 TableConfig 中的 minIdleStateRetentionTime 和 maxIdleStateRetentionTime 吗?

Benchao Li  于2020年8月10日周一 上午10:36写道:

> Hi,
>
> 最好也说一下使用的Flink版本以及对应的SQL,这样可以让其他同学容易复现这个问题。
>
> op <520075...@qq.com> 于2020年8月10日周一 上午10:27写道:
>
> > Hi
> >     在使用flink sql的过程中遇到如下情况,在配置了如下选项后:
> >   val config = tableConfig.getConfiguration()
> >     config.setString("table.exec.mini-batch.enabled",
> > "true")
> >  
>   config.setString("table.exec.mini-batch.allow-latency",
> > "5s")
> >     config.setString("table.exec.mini-batch.size", "20")
> >
> 无论使用FsStateBackend还是RocksDBStateBackend都无法清理掉空闲状态,运行时可以观察到checkpoint目录下状态大小持续增长;
> > 同一套代码,把这几个配置删掉后,运行时观察checkpoint目录下状态大小不会一直增长,能保持在一个范围内请问这是什么原因?谢谢
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: 来自郭华威的邮件

2020-08-09 文章 godfrey he
BatchTableEnvironmentImpl 属于 old planner,
缺少 flink-table-planner_${scala.binary.version}.jar 的依赖

郭华威  于2020年8月10日周一 上午10:21写道:

> flink1.11.1 使用tableApi  报错:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Create BatchTableEnvironment failed.
> at
> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:517)
> at
> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:471)
> at
> yueworld.worldCount.BatchWordCount_tablesql.main(BatchWordCount_tablesql.java:24)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at
> org.apache.flink.table.api.bridge.java.BatchTableEnvironment.create(BatchTableEnvironment.java:509)
> ... 2 more
> 但是相关的依赖都有的,下面是pom文件:
> 
>UTF-8
>1.11.1
>5.1.40
>2.11
>2.11.12
>1.8
>${java.version}
>${java.version}
> 
>
> 
>
>
>   org.apache.flink
>   flink-java
>   ${flink.version}
>
>
> 
> 
>   org.apache.flink
>
> flink-table-api-java-bridge_${scala.binary.version}
>   ${flink.version}
>
>
> 
> 
>   org.apache.flink
>
> flink-table-planner-blink_${scala.binary.version}
>   ${flink.version}
>
>
> 
> 
>   org.apache.flink
>   flink-table-common
>   ${flink.version}
>
>
> 
> 
>   org.apache.flink
>   flink-connector-jdbc_${scala.binary.version}
>   ${flink.version}
>
>
> 
> 
>   org.apache.flink
>
> flink-sql-connector-kafka-0.11_${scala.binary.version}
>   ${flink.version}
>
>
> 
> 
>   org.apache.flink
>   flink-json
>   ${flink.version}
>
>
> 
> 
>   mysql
>   mysql-connector-java
>   ${mysql.version}
>
>
> 
> 
>   org.apache.flink
>   flink-clients_2.11
>   1.11.1
>
>
>
>   org.apache.flink
>   flink-connector-jdbc_2.11
>   1.11.1
>
>
>
>   org.apache.flink
>   flink-streaming-scala_2.11
>   1.11.0
>
>
> 
> 
>   org.slf4j
>   slf4j-log4j12
>   1.7.7
>   runtime
>
>
>   log4j
>   log4j
>   1.2.17
>   runtime
>
> 
>
>
>
>
>
>


Re: Re: Re: FLINK SQL view的数据复用问题

2020-08-05 文章 godfrey he
目前sql-client还不支持。关于纯SQL文本statement set的支持,
目前社区已经达成语法的一致意见,应该后续会慢慢的支持。

kandy.wang  于2020年8月5日周三 下午10:43写道:

>
>
>
>
>
>
> @ godfrey
> 你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-04 19:36:56,"godfrey he"  写道:
> >调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用
> >
> >kandy.wang  于2020年8月4日周二 下午6:21写道:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> @ godfrey
> >> thanks。刚试了一下,source -> Deduplicate  ->
> >> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
> >> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
> >>
> >>
> >> 在 2020-08-04 17:26:02,"godfrey he"  写道:
> >> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
> >> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
> >> >
> >> >kandy.wang  于2020年8月4日周二 下午5:20写道:
> >> >
> >> >> FLINK SQL view相关问题:
> >> >> create view order_source
> >> >>
> >> >> as
> >> >>
> >> >> select order_id, order_goods_id, user_id,...
> >> >>
> >> >> from (
> >> >>
> >> >> ..  proctime,row_number() over(partition by order_id,
> >> >> order_goods_id order by proctime desc) as rownum
> >> >>
> >> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
> >> properties.group.id'='flink_etl_kafka_hbase',
> >> >> 'scan.startup.mode'='latest-offset') */
> >> >>
> >> >> ) where  rownum = 1 and  price > 0;
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> insert into hive.temp_dw.day_order_index select rowkey,
> ROW(cast(saleN
> >> as
> >> >> BIGINT),)
> >> >>
> >> >> from
> >> >>
> >> >> (
> >> >>
> >> >> select order_date as rowkey,
> >> >>
> >> >> sum(amount) as saleN,
> >> >>
> >> >> from order_source
> >> >>
> >> >> group by order_date
> >> >>
> >> >> );
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> insert into hive.temp_dw.day_order_index select rowkey,
> ROW(cast(saleN
> >> as
> >> >> BIGINT))
> >> >>
> >> >> from
> >> >>
> >> >> (
> >> >>
> >> >> select order_hour as rowkey,sum(amount) as saleN,
> >> >>
> >> >>
> >> >>
> >> >> from order_source
> >> >>
> >> >> group by order_hour
> >> >>
> >> >> );
> >> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer
> group。
> >> >> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  ->
> sink
> >> >> 2
> >> >>
> >> >>
> >> >> 本意是想通过view  order_source
> >> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
> >> >>
> >> >>
> >>
>


Re: Re: FLINK SQL view的数据复用问题

2020-08-04 文章 godfrey he
调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用

kandy.wang  于2020年8月4日周二 下午6:21写道:

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> @ godfrey
> thanks。刚试了一下,source -> Deduplicate  ->
> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
>
>
> 在 2020-08-04 17:26:02,"godfrey he"  写道:
> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
> >
> >kandy.wang  于2020年8月4日周二 下午5:20写道:
> >
> >> FLINK SQL view相关问题:
> >> create view order_source
> >>
> >> as
> >>
> >> select order_id, order_goods_id, user_id,...
> >>
> >> from (
> >>
> >> ..  proctime,row_number() over(partition by order_id,
> >> order_goods_id order by proctime desc) as rownum
> >>
> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
> properties.group.id'='flink_etl_kafka_hbase',
> >> 'scan.startup.mode'='latest-offset') */
> >>
> >> ) where  rownum = 1 and  price > 0;
> >>
> >>
> >>
> >>
> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
> as
> >> BIGINT),)
> >>
> >> from
> >>
> >> (
> >>
> >> select order_date as rowkey,
> >>
> >> sum(amount) as saleN,
> >>
> >> from order_source
> >>
> >> group by order_date
> >>
> >> );
> >>
> >>
> >>
> >>
> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
> as
> >> BIGINT))
> >>
> >> from
> >>
> >> (
> >>
> >> select order_hour as rowkey,sum(amount) as saleN,
> >>
> >>
> >>
> >> from order_source
> >>
> >> group by order_hour
> >>
> >> );
> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
> >> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
> >> 2
> >>
> >>
> >> 本意是想通过view  order_source
> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
> >>
> >>
>


Re: FLINK SQL view的数据复用问题

2020-08-04 文章 godfrey he
blink planner支持将多sink的query优化成尽量复用重复计算部分。
1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务

kandy.wang  于2020年8月4日周二 下午5:20写道:

> FLINK SQL view相关问题:
> create view order_source
>
> as
>
> select order_id, order_goods_id, user_id,...
>
> from (
>
> ..  proctime,row_number() over(partition by order_id,
> order_goods_id order by proctime desc) as rownum
>
> from hive.temp_dw.dm_trd_order_goods/*+ 
> OPTIONS('properties.group.id'='flink_etl_kafka_hbase',
> 'scan.startup.mode'='latest-offset') */
>
> ) where  rownum = 1 and  price > 0;
>
>
>
>
> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
> BIGINT),)
>
> from
>
> (
>
> select order_date as rowkey,
>
> sum(amount) as saleN,
>
> from order_source
>
> group by order_date
>
> );
>
>
>
>
> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
> BIGINT))
>
> from
>
> (
>
> select order_hour as rowkey,sum(amount) as saleN,
>
>
>
> from order_source
>
> group by order_hour
>
> );
> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
> 2
>
>
> 本意是想通过view  order_source
> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>
>


Re: Re: 有界数据中batch和stream的区别

2020-08-04 文章 godfrey he
你的运行环境是啥?能提供一下相关配置吗?

chenxuying  于2020年8月4日周二 下午2:46写道:

> 你好,请问下我修改后的语句是
> insert into print_sink select game_id,count(id) from mysql_source group by
> game_id
> 然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下
> 2> +I(12,1)
> 5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2)
> 6> -U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4)
>
>
> 然后如果我使用的是batchMode,他就报错了
> org.apache.flink.util.FlinkException: Error while shutting the
> TaskExecutor down.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425)
> ...
> Caused by: java.util.concurrent.CompletionException:
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.util.JavaGcCleanerWrapper
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> ...
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
> ... 21 more
> Suppressed: org.apache.flink.util.FlinkException: Could not properly shut
> down the TaskManager services.
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462)
> at...
> ... 21 more
> Caused by: org.apache.flink.util.FlinkException: Could not close resource.
> at
> org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204)
> ... 37 more
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.util.JavaGcCleanerWrapper
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94)
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64)
> ...
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
> ... 21 more
> [CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize
> class org.apache.flink.util.JavaGcCleanerWrapper]
>
>
> 不知道您是否知道原因
>
>
> 在 2020-08-04 12:11:32,"godfrey he"  写道:
> >逻辑上批产生的结果是Table,流产生的结果是Changelog。
> >你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
> >最简单的方式可以将query改为带group by的,再看结果的差异。
> >更多关于Table和Changelog的概念可以参考 [1]
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html
> >
> >chenxuying  于2020年8月4日周二 上午11:44写道:
> >
> >> hi :
> >> flink table sql 1.11.0
> >> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
> >>
> >>
> >> EnvironmentSettings environmentSettings =
> EnvironmentSettings.newInstance()
> >> //.inStreamingMode()
> >> .inBatchMode()
> >> .build();
> >>
> >>
> >> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
> >> 不知道大佬们有没有例子可以比较容易理解
> >> 我的代码
> >> EnvironmentSettings environmentSettings =
> EnvironmentSettings.newInstance()
> >> //.inStreamingMode()
> >> .inBatchMode()
> >> .build();
> >> TableEnvironment tableEnvironment =
> >> TableEnvironment.create(environmentSettings);
> >> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
> >> " id bigint, " +
> >> "  game_id varchar, " +
> >> "  PRIMARY KEY (id) NOT ENFORCED  " +
> >> " )  " +
> >> " with ( " +
> >> "'connector' = 'jdbc',  " +
> >> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
> >> " 'username' = 'root' , " +
> >> " 'password' = 'root', " +
> >> " 'table-name' = 'mysqlsink' , " +
> >> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
> >> " 'sink.buffer-flush.interval' = '2s', " +
> >> " 'sink.buffer-flush.max-rows' = '300' " +
> >> " )");
> >> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
> >> " id bigint, " +
> >> "  game_id varchar, " +
> >> "  PRIMARY KEY (id) NOT ENFORCED  " +
> >> " )  " +
> >> " with ( " +
> >> "'connector' = 'print'  " +
> >> " )");
> >> tableEnvironment.executeSql("insert into print_sink select id,game_id
> from
> >> mysql_source");
>


Re: 有界数据中batch和stream的区别

2020-08-03 文章 godfrey he
逻辑上批产生的结果是Table,流产生的结果是Changelog。
你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
最简单的方式可以将query改为带group by的,再看结果的差异。
更多关于Table和Changelog的概念可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html

chenxuying  于2020年8月4日周二 上午11:44写道:

> hi :
> flink table sql 1.11.0
> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
>
>
> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
> //.inStreamingMode()
> .inBatchMode()
> .build();
>
>
> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
> 不知道大佬们有没有例子可以比较容易理解
> 我的代码
> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
> //.inStreamingMode()
> .inBatchMode()
> .build();
> TableEnvironment tableEnvironment =
> TableEnvironment.create(environmentSettings);
> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
> " id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED  " +
> " )  " +
> " with ( " +
> "'connector' = 'jdbc',  " +
> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
> " 'username' = 'root' , " +
> " 'password' = 'root', " +
> " 'table-name' = 'mysqlsink' , " +
> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
> " 'sink.buffer-flush.interval' = '2s', " +
> " 'sink.buffer-flush.max-rows' = '300' " +
> " )");
> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
> " id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED  " +
> " )  " +
> " with ( " +
> "'connector' = 'print'  " +
> " )");
> tableEnvironment.executeSql("insert into print_sink select id,game_id from
> mysql_source");


Re: Flink DDL 写 Hive parquet 是否支持 snappy压缩

2020-08-03 文章 godfrey he
parquet.compression=SNAPPY,更多信息可参考[1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/parquet.html#format-options





lydata  于2020年8月4日周二 上午11:39写道:

>  Flink DDL的方式 写 Hive parquet  格式 ,是否支持 snappy压缩,如果支持 请问下参数是什么?


Re: UDF:Type is not supported: ANY

2020-08-03 文章 godfrey he
你把Map换为Map试试

zilong xiao  于2020年8月3日周一 下午4:56写道:

> 目前转List可以用数组代替,Map貌似没法成功运行
>
> zilong xiao  于2020年8月3日周一 上午10:43写道:
>
> > 最近在写Flink SQL处理数据的时候发现Flink自带的function不太能满足使用,像很常见的Json2Array
> > Json2Map貌似官方都没有实现,所以需要自定义函数来实现,然后我自己尝试用Jackson来实现时发现在语法检查时总是会报 `Type is
> not
> > supported:
> >
> ANY`,个人猜想这个是不是和Java泛型的特性有关,由于Java是假泛型,最终泛型会被擦除编程Object才会引发这个异常呢?想到Flink本身也有一个字符串转容器的函数
> > STR_TO_MAP,看了下该函数实现是用Scala编写,不确定该异常是否真是由泛型引起,如果是,如果想要Java写Json2Array
> Json2Map
> > udf应该怎么操作呢?求前辈指导
> >
> > udfd代码如下:
> >
> > public class Json2List extends ScalarFunction {
> >
> >private static final Logger LOG =
> LoggerFactory.getLogger(Json2List.class);
> >
> >private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
> >   .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
> >   .configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true) ;
> >
> >public Json2List(){}
> >
> >public List eval(String param) {
> >   List result = new ArrayList<>();
> >   try {
> >  List> list = OBJECT_MAPPER.readValue(param,
> List.class);
> >  for(Map map : list){
> > result.add(OBJECT_MAPPER.writeValueAsString(map));
> >  }
> >  return result;
> >   } catch (JsonProcessingException e){
> >  LOG.error("failed to convert json to array, param is: {}",
> param, e);
> >   }
> >   return result;
> >}
> >
> >
> >@Override
> >public TypeInformation> getResultType(Class[]
> signature) {
> >   return Types.LIST(Types.STRING);
> >}
> >
> > }
> >
> >
>


Re: 数据预览

2020-08-02 文章 godfrey he
如果你想在client端拿到query的结果做preview的话,目前API层面支持直接collect或者print执行结果,可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#execute-a-query

Jeff Zhang  于2020年8月1日周六 下午11:01写道:

> Apache Zeppelin有自己的rest api,你可以用rest api来提交flink sql
> 以及拿sql结果,目前Zeppelin社区正在做一个Client API (Zeppelin SDK),
> 用户可以更加方便的调用Zeppelin的功能。具体可以参考
> https://issues.apache.org/jira/browse/ZEPPELIN-4981
>
> 这里有Sample code 可以参考
>
> https://github.com/zjffdu/zeppelin/blob/ZEPPELIN-4981/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java#L298
>
> 对于Flink on Zeppelin感兴趣的,可以加入钉钉群:32803524
>
>
>
> forideal  于2020年8月1日周六 下午7:49写道:
>
> > 你好,我的朋友
> >
> >
> >最近我看 Flink doc 中的文档中有了如下 connector
> >   DataGen
> >   Print
> >   BlackHole
> >这大大的方便了开发和调试。不过,我还是不太满足,想了解一下数据预览相关的做法。
> >比如我想,如果我有一个 Flink 的 `driver` ,然后,我使用这个 driver 提交一条 SQL,我从
> ResultSet
> > 中获取数据。这样又可以大大的方面我们的 Flink SQL 开发者。
> >在社区中,我已经体验了 Apache Zeppelin ,他可以让我提交 Flink SQL,然后在页面上面等待刷新的结果,但是
> > Zeppelin 目前不能很好的集成到我们的 Flink web IDE 中。想了解一下如何实现数据预览。
> >
> >
> >Best forideal
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: StatementSet 里添加多个insertsql执行

2020-07-30 文章 godfrey he
StatementSet 中的多个insert会被编译成一个job提交。
你能提供一下对应的代码样例吗?

op <520075...@qq.com> 于2020年7月30日周四 下午3:57写道:

> 大家好,我发现StatementSet 里添加2个insertsql执行的时候会启动两个application,
> 这两个任务除了sink都是一样的吗?这样是不是会重复计算和浪费资源,而且两边的数据可能不同步,
> 有什么办法能解决?
> 谢谢


Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 文章 godfrey he
Yes, The pr still needs to be improved.
In most cases, there are more than one statement in the sql file,
so -f option should support multiple statements.
however, a related PR [1] has not completed yet.

[1] https://github.com/apache/flink/pull/8738

Best,
Godfrey

Jun Zhang  于2020年7月29日周三 上午10:17写道:

> hi,godfrey:
> Thanks for your reply
>
> 1. I have seen the -u parameter, but my sql file may not only include
> 'insert into select ', but also SET, DDL, etc.
>
> 2. I may not have noticed this issue. I took a look at this issue. I think
> this issue may have some problems. For example, he finally called the
> CliClient.callCommand method.
> But I think that many options in callCommand are not completely suitable
> for sql files, such as HELP, CLEAR, SELECT, etc. The select operation opens
> a window to display the results, obviously this is not suitable for
> executing sql files
>
> godfrey he  于2020年7月29日周三 上午9:56写道:
>
>> hi Jun,
>>
>> Currently, sql client has supported -u option, just like:
>>  ./bin/sql-client.sh embedded -u "insert_statement".
>>
>> There is already a JIRA [1] that wants to support -f option
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12828
>>
>> Best,
>> Godfrey
>>
>> Jun Zhang  于2020年7月29日周三 上午9:22写道:
>>
>>> I want to execute some flink sql batch jobs regularly, such as 'insert
>>> into
>>> select .', but I can't find a suitable method so far, so reference
>>>  hive, I changed the source code and add a  '--filename'  parameter  so
>>> that we can execute a sql file.
>>>
>>> like this:
>>>
>>> /home/flink/bin/sql-client.sh embedded -f flink.sql
>>>
>>> what about any ideas or plans for this feature community?
>>>
>>


Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 文章 godfrey he
hi Jun,

Currently, sql client has supported -u option, just like:
 ./bin/sql-client.sh embedded -u "insert_statement".

There is already a JIRA [1] that wants to support -f option

[1] https://issues.apache.org/jira/browse/FLINK-12828

Best,
Godfrey

Jun Zhang  于2020年7月29日周三 上午9:22写道:

> I want to execute some flink sql batch jobs regularly, such as 'insert into
> select .', but I can't find a suitable method so far, so reference
>  hive, I changed the source code and add a  '--filename'  parameter  so
> that we can execute a sql file.
>
> like this:
>
> /home/flink/bin/sql-client.sh embedded -f flink.sql
>
> what about any ideas or plans for this feature community?
>


Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 文章 godfrey he
这个问题只能说是使用TableEnvironment不当的问题。ververica的gateway的模式其实就是多线程。
创建TableEnvironment和使用TableEnvironment可能不是一个线程,worker线程是被复用的。
简单来说就是:
当session创建的时候,worker thread1 会创建一个TableEnvironment,
然后当后续其他该session请求过来时候,可能是 worker thread2使用该TableEnvironment执行sql。

这个其实就是在多线程情况下使用TableEnvironment。不符合TableEnvironment只能在单线程使用的约束。

wind.fly@outlook.com  于2020年7月28日周二 下午2:09写道:

>
> gateway就类似于一个web服务,大概流程是建立连接时会初始化一个session,在session里面初始化TableEnvironment,然后根据sql类型做不同的操作,比如select语句会去执行sqlQuery,具体可查看
> https://github.com/ververica/flink-sql-gateway。
>
> 另外,加了RelMetadataQueryBase.THREAD_PROVIDERS
>
> .set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))后确实不报这个错了。
>
> 题外话,个人认为flink不应该将这样的异常抛给用户去解决,除非我去深入研究源码,要不然根本无法搞清楚具体发生了什么,在封装性上还有待改善。
> ________
> 发件人: godfrey he 
> 发送时间: 2020年7月28日 13:55
> 收件人: user-zh 
> 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
>
> 我的怀疑点还是多线程引起的。
> 你能具体描述一下你们gateway的行为吗? 是一个web server?
>
> 另外,你可以在table env执行query前加上
> RelMetadataQueryBase.THREAD_PROVIDERS
>
> .set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()));
> 这句话临时fix。
>
> wind.fly@outlook.com  于2020年7月28日周二
> 上午11:02写道:
>
> > 不是多线程同时操作一个tableEnvironment,每执行一次都会创建一个TableEnvironment
> > 
> > 发件人: godfrey he 
> > 发送时间: 2020年7月28日 9:58
> > 收件人: user-zh 
> > 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
> >
> > 你们是否在多线程环境下使用 TableEnvironment ?
> > TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。
> >
> > godfrey he  于2020年7月28日周二 上午9:55写道:
> >
> > > hi 能给出详细的schema信息吗?
> > >
> > > wind.fly@outlook.com  于2020年7月27日周一
> > > 下午7:02写道:
> > >
> > >> 补充一下,执行的sql如下:
> > >>
> > >> select order_no, order_time from
> > >> x.ods.ods_binlog_test_trip_create_t_order_1
> > >>
> > >> 
> > >> 发件人: wind.fly@outlook.com 
> > >> 发送时间: 2020年7月27日 18:49
> > >> 收件人: user-zh@flink.apache.org 
> > >> 主题: flink1.11.0 执行sqlQuery时报NullPointException
> > >>
> > >> Hi,all:
> > >>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
> > >> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
> > >> Caused by: java.lang.NullPointerException
> > >>   at java.util.Objects.requireNonNull(Objects.java:203)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
> > >>   at
> > >>
> >
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
> > >>   at
> org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
> > >>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
> > >>   at
> > >> org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> > >>   at
> > >>
> >
> o

Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 文章 godfrey he
我的怀疑点还是多线程引起的。
你能具体描述一下你们gateway的行为吗? 是一个web server?

另外,你可以在table env执行query前加上
RelMetadataQueryBase.THREAD_PROVIDERS
.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()));
这句话临时fix。

wind.fly@outlook.com  于2020年7月28日周二 上午11:02写道:

> 不是多线程同时操作一个tableEnvironment,每执行一次都会创建一个TableEnvironment
> 
> 发件人: godfrey he 
> 发送时间: 2020年7月28日 9:58
> 收件人: user-zh 
> 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
>
> 你们是否在多线程环境下使用 TableEnvironment ?
> TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。
>
> godfrey he  于2020年7月28日周二 上午9:55写道:
>
> > hi 能给出详细的schema信息吗?
> >
> > wind.fly@outlook.com  于2020年7月27日周一
> > 下午7:02写道:
> >
> >> 补充一下,执行的sql如下:
> >>
> >> select order_no, order_time from
> >> x.ods.ods_binlog_test_trip_create_t_order_1
> >>
> >> 
> >> 发件人: wind.fly@outlook.com 
> >> 发送时间: 2020年7月27日 18:49
> >> 收件人: user-zh@flink.apache.org 
> >> 主题: flink1.11.0 执行sqlQuery时报NullPointException
> >>
> >> Hi,all:
> >>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
> >> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
> >> Caused by: java.lang.NullPointerException
> >>   at java.util.Objects.requireNonNull(Objects.java:203)
> >>   at
> >>
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> >>   at
> >>
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> >>   at
> >>
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
> >>   at
> >>
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
> >>   at
> >>
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
> >>   at
> >>
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
> >>   at
> >>
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
> >>   at
> >>
> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
> >>   at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
> >>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
> >>   at
> >> org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> >>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
> >>   at
> >>
> org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
> >>   at
> >>
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
> >>   at
> >>
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
> >>   at
> >>
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
> >>   at
> >>
> org.apache.flink.table.catalog.CatalogMan

Re: flink1.11.1使用Table API Hive方言的executSql报错

2020-07-27 文章 godfrey he
你的包是完整的flink-1.11.1的包吗?
例如 check一下 ClusterClientJobClientAdapter 这个类是否继承 CoordinationRequestGateway
?

shimin huang  于2020年7月28日周二 上午11:21写道:

> Hi,all:
>   本人基于Flink1.11.1的table API使用Hive方言,调用executSql方法后报错,堆栈信息如下:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Failed to execute sql
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:302) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.client.program.PackagedProgram
> .invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.
> 11-1.11.1.jar:1.11.1]
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:
> 149) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.client.deployment.application.
> DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.client.deployment.application.
> DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler
> .lambda$handleRequest$0(JarRunHandler.java:100) ~[flink-dist_2.11-1.11.1
> .jar:1.11.1]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(
> CompletableFuture.java:1604) [?:1.8.0_242]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
> 511) [?:1.8.0_242]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [?:1.8.0_242
> ]
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
> .access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242]
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
> .run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_242]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
> .java:1149) [?:1.8.0_242]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
> .java:624) [?:1.8.0_242]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> at org.apache.flink.table.api.internal.TableEnvironmentImpl
> .executeInternal(TableEnvironmentImpl.java:747) ~[flink-table-blink_2.11-
> 1.11.1.jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl
> .executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11-
> 1.11.1.jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
> TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.
> 0_242]
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl
> .java:62) ~[?:1.8.0_242]
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242]
> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> ... 13 more
> Caused by: java.lang.IllegalArgumentException: Job client must be a
> CoordinationRequestGateway. This is a bug.
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:
> 139) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher
> .setJobClient(CollectResultFetcher.java:97) ~[flink-dist_2.11-1.11.1.jar:
> 1.11.1]
> at org.apache.flink.streaming.api.operators.collect.
> CollectResultIterator.setJobClient(CollectResultIterator.java:84)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.table.planner.sinks.SelectTableSinkBase
> .setJobClient(SelectTableSinkBase.java:81) ~[flink-table-blink_2.11-1.11.1
> .jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl
> .executeInternal(TableEnvironmentImpl.java:737) ~[flink-table-blink_2.11-
> 1.11.1.jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl
> .executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11-
> 1.11.1.jar:1.11.1]
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
> TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.
> 0_242]
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl
> .java:62) ~[?:1.8.0_242]
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242]
> at org.apache.flink.client.program.PackagedProgram.

Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 文章 godfrey he
你们是否在多线程环境下使用 TableEnvironment ?
TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。

godfrey he  于2020年7月28日周二 上午9:55写道:

> hi 能给出详细的schema信息吗?
>
> wind.fly@outlook.com  于2020年7月27日周一
> 下午7:02写道:
>
>> 补充一下,执行的sql如下:
>>
>> select order_no, order_time from
>> x.ods.ods_binlog_test_trip_create_t_order_1
>>
>> 
>> 发件人: wind.fly@outlook.com 
>> 发送时间: 2020年7月27日 18:49
>> 收件人: user-zh@flink.apache.org 
>> 主题: flink1.11.0 执行sqlQuery时报NullPointException
>>
>> Hi,all:
>>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
>> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
>> Caused by: java.lang.NullPointerException
>>   at java.util.Objects.requireNonNull(Objects.java:203)
>>   at
>> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
>>   at
>> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
>>   at
>> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
>>   at
>> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
>>   at
>> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
>>   at
>> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
>>   at
>> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
>>   at
>> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
>>   at
>> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
>>   at
>> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
>>   at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
>>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
>>   at
>> org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>>   at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>>   at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>>   at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
>>   at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
>>   at
>> org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
>>   at
>> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
>>   at
>> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
>>   at
>> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
>>   at
>> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
>>   at
>> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
>>   at
>> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
>>   at
>> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>>   at
>> org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>>   at
>> org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
>>   at
>> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
>>   at
>> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>>   at
>> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
>>   at
>> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNam

Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-27 文章 godfrey he
hi 能给出详细的schema信息吗?

wind.fly@outlook.com  于2020年7月27日周一 下午7:02写道:

> 补充一下,执行的sql如下:
>
> select order_no, order_time from
> x.ods.ods_binlog_test_trip_create_t_order_1
>
> 
> 发件人: wind.fly@outlook.com 
> 发送时间: 2020年7月27日 18:49
> 收件人: user-zh@flink.apache.org 
> 主题: flink1.11.0 执行sqlQuery时报NullPointException
>
> Hi,all:
>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
> Caused by: java.lang.NullPointerException
>   at java.util.Objects.requireNonNull(Objects.java:203)
>   at
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
>   at
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
>   at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
>   at
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
>   at
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
>   at
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
>   at
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
>   at
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
>   at
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
>   at
> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
>   at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
>   at org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>   at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>   at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
>   at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
>   at
> org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
>   at
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
>   at
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
>   at
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
>   at
> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
>   at
> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
>   at
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
>   at
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>   at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>   at
> org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
>   at
> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
>   at
> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>   at
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
>   at
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
>   at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
>   at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
>   at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>   at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNa

Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误

2020-07-23 文章 godfrey he
和hive结合下,filesystem是支持流式读取的,可以参考 [1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/hive/hive_streaming.html#streaming-reading

Leonard Xu  于2020年7月23日周四 下午10:28写道:

> Hi,
>
> Filesystem connector 支持streaming 写入,streaming 读取
> 还未支持,所以读取完了就停止。支持streaming 写入从文档上看[1]应该是有计划的
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/filesystem.html
> >
>
>
> > 在 2020年7月23日,22:05,Asahi Lee <978466...@qq.com> 写道:
> >
> > 使用filesystem读取csv作为源,使用流环境,为什么我的程序一执行就停止,而不是等待文件的追加写入,继续计算呢?
> > 还是filesystem只能用于批操作?
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人:
> "user-zh"
>   <
> xbjt...@gmail.com >;
> > 发送时间: 2020年7月23日(星期四) 上午9:55
> > 收件人: "user-zh" user-zh@flink.apache.org>>;
> >
> > 主题: Re: flink 1.11 ddl sql 添加PROCTIME()列,读取csv错误
> >
> >
> >
> > Hi, Asahi
> >
> > 这是一个已知bug[1],filesystem connector上处理计算列有点问题,已经有PR了,会在1.11.2和1.12版本上修复
> >
> >
> > Best
> > Leonard Xu
> > [1] https://issues.apache.org/jira/browse/FLINK-18665 <
> https://issues.apache.org/jira/browse/FLINK-18665> <
> https://issues.apache.org/jira/browse/FLINK-18665> <
> https://issues.apache.org/jira/browse/FLINK-18665>>;
> >
> > > 在 2020年7月23日,00:07,Asahi Lee <978466...@qq.com  978466...@qq.com>> 写道:
> > >
> > > 1. 程序
> > > StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> > >         EnvironmentSettings
> bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > >         StreamTableEnvironment
> bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
> > >
> > >
> > >         String sourceTableDDL =
> "CREATE TABLE fs_table (" +
> > >            
>     "  user_id STRING," +
> > >            
>     "  order_amount DOUBLE," +
> > >            
>     "  dt TIMESTAMP(3)," +
> > >            
>     "  pt AS PROCTIME() " +
> > >            
>     " ) WITH (" +
> > >            
>     "  'connector'='filesystem'," +
> > >            
>     "  'path'='D:\\Program
> Files\\JetBrains\\workspace\\table-walkthrough\\src\\main\\resources\\csv\\order.csv',"
> +
> > >            
>     "  'format'='csv'" +
> > >            
>     " )";
> > >
> > >
> > >        
> bsTableEnv.executeSql(sourceTableDDL);
> > >        
> bsTableEnv.executeSql("select * from fs_table").print();
> > > 2. csv文件
> > > order.csv
> > > zhangsan,12.34,2020-08-03 12:23:50
> > > lisi,234.67,2020-08-03 12:25:50
> > > wangwu,57.6,2020-08-03 12:25:50
> > > zhaoliu,345,2020-08-03 12:28:50
> > >
> > >
> > >
> > > 3. 错误
> > >  - Source: FileSystemTableSource(user_id, order_amount,
> dt, pt) -> Calc(select=[user_id, order_amount, dt,
> PROCTIME_MATERIALIZE(()) AS pt]) -> SinkConversionToRow (4/6)
> (9ee0383d676a190b0a62d206039db26c) switched from RUNNING to FAILED.
> > > java.io.IOException: Failed to deserialize CSV row.
> > >  at
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:299)
> > >  at
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:210)
> > >  at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
> > >  at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> > >  at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> > >  at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> > > Caused by: java.lang.RuntimeException: Row length mismatch. 4
> fields expected but was 3.
> > >  at
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.validateArity(CsvRowDataDeserializationSchema.java:441)
> > >  at
> org.apache.flink.formats.csv.CsvRowDataDeserializationSchema.lambda$createRowConverter$1ca9c073$1(CsvRowDataDeserializationSchema.java:244)
> > >  at
> org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:293)
> > >  ... 5 more
>
>


Re: Re: flink sqlUpdate,如何获取里面的字段,字段类型,with 等3个属性

2020-07-23 文章 godfrey he
1.10 也是支持的

Michael Ran  于2020年7月22日周三 下午9:07写道:

> 1.tableEvn.from(xx).getSchema() 我确实通过这个拿到了schema,2.with
> properties属性很重要 ,关系我自定义的一些参数设定。3.关于  catalog 这个东西,是不是只有1.11
> 版本才能从catalog  获取  with properties 哦? 1.10 you  有支持吗
> 在 2020-07-22 18:22:22,"godfrey he"  写道:
> >tableEnv 中 可以通过
> >tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。
> >如果要拿到properties,可以通过catalog的接口得到 [1]。
> >如果要自定义实现source/sink,可以参考 [2]
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html
> >[2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html
> >
> >Best,
> >Godfrey
> >
> >
> >
> >
> >
> >Michael Ran  于2020年7月22日周三 下午4:10写道:
> >
> >> dear all:
> >>  我用flink 注册一张表:
> >>   CREATE TABLE dim_mysql (
> >> id int,  --
> >> type varchar --
> >> ) WITH (
> >> 'connector' = 'jdbc',
> >> 'url' = 'jdbc:mysql://localhost:3390/test',
> >> 'table-name' = 'flink_test',
> >> 'driver' = 'com.mysql.cj.jdbc.Driver',
> >> 'username' = '',
> >> 'password' = '',
> >> 'lookup.cache.max-rows' = '5000',
> >> 'lookup.cache.ttl' = '1s',
> >> 'lookup.max-retries' = '3'
> >> )
> >> 有没有通过 tableEnv 去获取,字段[id,type]  类型[INTEGER,VARCHAR]
> >> 以及属性,map 这种。
> >> 我看阿里官方有blink 支持自定义sink:
> >> publicabstractclassCustomSinkBaseimplementsSerializable{
> >> protectedMap userParamsMap;// 您在sql
> with语句中定义的键值对,但所有的键均为小写
> >> protectedSet primaryKeys;// 您定义的主键字段名
> >> protectedList headerFields;// 标记为header的字段列表
> >> protectedRowTypeInfo rowTypeInfo;// 字段类型和名称
> >> 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑
>


Re: flink 1.11 ddl 写mysql的问题

2020-07-23 文章 godfrey he
你观察到有sink写不过来导致反压吗?
或者你调大flush interval试试,让每个buffer攒更多的数据

曹武 <14701319...@163.com> 于2020年7月23日周四 下午4:48写道:

> 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
> 代码如下:
> String sourceDdl =" CREATE TABLE debezium_source " +
> "( " +
> "id STRING NOT NULL, name STRING, description STRING,
> weight
> Double" +
> ") " +
> "WITH (" +
> " 'connector' = 'kafka-0.11'," +
> " 'topic' = 'test0717'," +
> " 'properties.bootstrap.servers' = ' 172.22.20.206:9092',
> "
> +
> "'scan.startup.mode' =
> 'group-offsets','properties.group.id'='test'," +
> "'format' = 'debezium-json'," +
> "'debezium-json.schema-include'='false'," +
> "'debezium-json.ignore-parse-errors'='true')";
> tEnv.executeSql(sourceDdl);
> System.out.println("init source ddl successful ==>" + sourceDdl);
> String sinkDdl = " CREATE TABLE sink " +
> "( " +
> "id STRING NOT NULL," +
> " name STRING, " +
> "description STRING," +
> " weight Double," +
> " PRIMARY KEY (id) NOT ENFORCED " +
> ")" +
> " WITH " +
> "( " +
> "'connector' = 'jdbc', " +
> "'url' =
> 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " +
> "'table-name' = 'table-out', " +
> "'driver'= 'com.mysql.cj.jdbc.Driver'," +
> "'sink.buffer-flush.interval'='1s'," +
> "'sink.buffer-flush.max-rows'='1000'," +
> "'username'='DataPip', " +
> "'password'='DataPip')";
> tEnv.executeSql(sinkDdl);
> System.out.println("init sink ddl successful ==>" + sinkDdl);
>
>  String dml = "INSERT INTO sink SELECT  id,name ,description,
> weight FROM debezium_source";
> System.out.println("execute dml  ==>" + dml);
> tEnv.executeSql(dml);
> tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
> 'print')" +
> "LIKE debezium_source (EXCLUDING ALL)");
> tEnv.executeSql("INSERT INTO print_table SELECT  id,name
> ,description,  weight FROM debezium_source");
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 文章 godfrey he
hi,
目前没有解决办法,insert job根据sink表名自动生成job name。
后续解法关注 https://issues.apache.org/jira/browse/FLINK-18545

Weixubin <18925434...@163.com> 于2020年7月23日周四 下午6:07写道:

> Hi,
> 我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。
> 而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。
> 请问有什么解决的方法吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-08 16:07:17,"Jingsong Li"  写道:
> >Hi,
> >
> >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
> >
> >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
> >并没有真正的物理节点。你不用再调用了。
> >
> >Best,
> >Jingsong
> >
> >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach  wrote:
> >
> >>
> >>
> >>
> >> 代码结构改成这样的了:
> >>
> >>
> >>
> >>
> >> val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
> >> val blinkEnvSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>
> >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> >> blinkEnvSettings)
> >>
> >>
> >>
> >>
> >>
> >> streamExecutionEnv.execute("from kafka sink hbase")
> >>
> >>
> >>
> >>
> >> 还是报一样的错
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-08 15:40:41,"夏帅"  写道:
> >> >你好,
> >> >可以看看你的代码结构是不是以下这种
> >> >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> >> >val bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
> >> >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> >> >  ..
> >> >tableEnv.execute("")
> >> >如果是的话,可以尝试使用bsEnv.execute("")
> >> >1.11对于两者的execute代码实现有改动
> >> >
> >> >
> >> >--
> >> >发件人:Zhou Zach 
> >> >发送时间:2020年7月8日(星期三) 15:30
> >> >收件人:Flink user-zh mailing list 
> >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming
> topology
> >> >
> >> >代码在flink
> >>
> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
> >> >Exception in thread "main" java.lang.IllegalStateException: No
> operators
> >> defined in streaming topology. Cannot generate StreamGraph.
> >> >at
> >>
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> >> >at
> >>
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> >> >at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
> >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
> >> >
> >> >
> >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
> >> >
> >> >
> >> >
> >> >
> >> >query:
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|CREATE TABLE `user` (
> >> >|uid BIGINT,
> >> >|sex VARCHAR,
> >> >|age INT,
> >> >|created_time TIMESTAMP(3),
> >> >|WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >> >|) WITH (
> >> >|'connector.type' = 'kafka',
> >> >|'connector.version' = 'universal',
> >> >|-- 'connector.topic' = 'user',
> >> >|'connector.topic' = 'user_long',
> >> >|'connector.startup-mode' = 'latest-offset',
> >> >|'connector.properties.group.id' = 'user_flink',
> >> >|'format.type' = 'json',
> >> >|'format.derive-schema' = 'true'
> >> >|)
> >> >|""".stripMargin)
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|CREATE TABLE user_hbase3(
> >> >|rowkey BIGINT,
> >> >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
> >> >|) WITH (
> >> >|'connector.type' = 'hbase',
> >> >|'connector.version' = '2.1.0',
> >> >|'connector.table-name' = 'user_hbase2',
> >> >|'connector.zookeeper.znode.parent' = '/hbase',
> >> >|'connector.write.buffer-flush.max-size' = '10mb',
> >> >|'connector.write.buffer-flush.max-rows' = '1000',
> >> >|'connector.write.buffer-flush.interval' = '2s'
> >> >|)
> >> >|""".stripMargin)
> >> >
> >> >
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|insert into user_hbase3
> >> >|SELECT uid,
> >> >|
> >> >|  ROW(sex, age, created_time ) as cf
> >> >|  FROM  (select uid,sex,age, cast(created_time as VARCHAR) as
> >> created_time from `user`)
> >> >|
> >> >|""".stripMargin)
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


Re: flink 1.11 executeSql查询kafka表print没有输出

2020-07-23 文章 godfrey he
client端会不断的pull sink产生的数据,但是只有等checkpoint完成后,其对应的数据才能 collect() 和 print()
返回。
这是为了保证exactly once语义。
在1.12里,同时支持了at least once 和 exactly once 语义。默认情况下是 at least once,collect()
和 print() 的结果可能有重复。
如果有兴趣可以参考pr:https://github.com/apache/flink/pull/12867
<https://github.com/apache/flink/pull/12867#event-3578490750>

Best,
Godfrey

wind.fly@outlook.com  于2020年7月23日周四 下午7:34写道:

> Hi,Godfrey:
>  加了checkpoint后确实可以了,能具体讲一下原理吗?print是在完成快照的时候顺便把结果输出了吗?或者有没有相关文档?
>
> Best,
> Junbao Zhang
> ________
> 发件人: godfrey he 
> 发送时间: 2020年7月23日 19:24
> 收件人: user-zh 
> 主题: Re: flink 1.11 executeSql查询kafka表print没有输出
>
> 1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下,
> 都是exactly once语义,需要配置checkpoint才能得到结果。
>
> Best,
> Godfrey
>
> wind.fly@outlook.com  于2020年7月23日周四
> 下午7:22写道:
>
> > Hi, all:
> >
> >
> 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的),
> > sql如下:
> >
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > EnvironmentSettings settings =
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> settings);
> >
> > Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0");
> > tEnv.registerCatalog("x", catalog);
> >
> > TableResult execute = tEnv.executeSql("select * from
> > x.ods.ods_binlog_test_trip_create_t_order_1");
> >
> > execute.print();
> >
> > 建表语句如下:
> >
> > CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 (
> >   _type STRING,
> >   order_no STRING,
> >   order_time STRING,
> >   dt as TO_TIMESTAMP(order_time),
> >   proctime as PROCTIME(),
> >   WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> > ) WITH (
> >   'connector.type' = 'kafka',
> >   'connector.properties.bootstrap.servers' = '***',
> >   'connector.properties.zookeeper.connect' = '',
> >   'connector.version' = 'universal',
> >   'format.type' = 'json',
> >   'connector.properties.group.id' = 'testGroup',
> >   'connector.startup-mode' = 'group-offsets',
> >   'connector.topic' = 'test'
> > )
> >
>


Re: flink 1.11 executeSql查询kafka表print没有输出

2020-07-23 文章 godfrey he
1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下,
都是exactly once语义,需要配置checkpoint才能得到结果。

Best,
Godfrey

wind.fly@outlook.com  于2020年7月23日周四 下午7:22写道:

> Hi, all:
>
> 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的),
> sql如下:
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>
> Catalog catalog = new HiveCatalog("x", "default", "D:\\conf", "1.1.0");
> tEnv.registerCatalog("x", catalog);
>
> TableResult execute = tEnv.executeSql("select * from
> x.ods.ods_binlog_test_trip_create_t_order_1");
>
> execute.print();
>
> 建表语句如下:
>
> CREATE TABLE x.ods.ods_binlog_test_trip_create_t_order_1 (
>   _type STRING,
>   order_no STRING,
>   order_time STRING,
>   dt as TO_TIMESTAMP(order_time),
>   proctime as PROCTIME(),
>   WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.properties.bootstrap.servers' = '***',
>   'connector.properties.zookeeper.connect' = '',
>   'connector.version' = 'universal',
>   'format.type' = 'json',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'group-offsets',
>   'connector.topic' = 'test'
> )
>


Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 文章 godfrey he
这个问题的已经有一个issue:https://issues.apache.org/jira/browse/FLINK-18545,请关注

WeiXubin <18925434...@163.com> 于2020年7月23日周四 下午6:00写道:

> Hi,
> 我想请问下使用 streamExecutionEnv.execute("from kafka sink
> hbase"),通过这种方式可以给Job指定名称。
> 而当使用streamTableEnv.executeSql(sql)之后似乎无法给Job定义名称。
> 请问有什么解决方案吗?谢谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink sqlUpdate,如何获取里面的字段,字段类型,with 等3个属性

2020-07-22 文章 godfrey he
tableEnv 中 可以通过
tableEvn.from(xx).getSchema() 拿到该表的schema信息,但是没法拿到对应的properties。
如果要拿到properties,可以通过catalog的接口得到 [1]。
如果要自定义实现source/sink,可以参考 [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/catalogs.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sourceSinks.html

Best,
Godfrey





Michael Ran  于2020年7月22日周三 下午4:10写道:

> dear all:
>  我用flink 注册一张表:
>   CREATE TABLE dim_mysql (
> id int,  --
> type varchar --
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://localhost:3390/test',
> 'table-name' = 'flink_test',
> 'driver' = 'com.mysql.cj.jdbc.Driver',
> 'username' = '',
> 'password' = '',
> 'lookup.cache.max-rows' = '5000',
> 'lookup.cache.ttl' = '1s',
> 'lookup.max-retries' = '3'
> )
> 有没有通过 tableEnv 去获取,字段[id,type]  类型[INTEGER,VARCHAR]
> 以及属性,map 这种。
> 我看阿里官方有blink 支持自定义sink:
> publicabstractclassCustomSinkBaseimplementsSerializable{
> protectedMap userParamsMap;// 您在sql with语句中定义的键值对,但所有的键均为小写
> protectedSet primaryKeys;// 您定义的主键字段名
> protectedList headerFields;// 标记为header的字段列表
> protectedRowTypeInfo rowTypeInfo;// 字段类型和名称
> 核心需求是:获取定义的表的所有属性,自己实现自己的功能,包括 join sink 等各种逻辑


Re: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 文章 godfrey he
Hi,首维

感谢给出非常详细的反馈。这个问题我们之前内部也有一些讨论,但由于缺乏一些真实场景,最后维持了当前的接口。
我们会根据你提供的场景进行后续讨论。

Best,
Godfrey

刘首维  于2020年7月22日周三 下午5:23写道:

> Hi, Jark
>
>
>
>感谢你的建议!
>
>我们这边充分相信社区在SQL/Table上的苦心孤诣,也愿意跟进Flink在新版本的变化。
>
>先看我遇到问题本身,你的建议确实可以帮助我解决问题。我想聊一下我对问题之外的一些想法
>
>```
>
>  >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter
> 用来做缓冲池/微批/数据过滤等功能
> 这个我觉得也可以封装在 SinkFunction 里面。
>
>   ```
>
>  
> 比如上述这个问题2,我们确实可以把它做到SinkFunction中,但是我个人认为这可能在设计上不够理想的。我个人在设计编排Function/算子的时候习惯于遵循”算子单一职责”的原则,这也是我为什么会拆分出多个process/filter算子编排到SinkFunction前面而非将这些功能耦合到SinkFunction去做。另一方面,没了DataStream,向新的API的迁移成本相对来说变得更高了一些~
> 又或者,我们现在还有一些特殊原因,算子编排的时候会去修改TaskChain Strategy,这个时候DataStream的灵活性是必不可少的
>
> 考虑到Flink Task都可以拆分成Source -> Transformation -> sink
> 三个阶段,那么能让用户可以对自己的作业针对(流或批)的运行模式下,可以有效灵活做一些自己的定制策略/优化/逻辑可能是会方便的~
>
>诚然,DataStream的灵活性确实会是一把双刃剑,但就像@leonard提到的,平台层和应用层的目的和开发重点可能也不太一样,对Flink
> API使用侧重点也不同。我个人还是希望可以在享受全新API设计优势同时,
>
> 可以继续使用DataStream(Transformation)的灵活性,助力Flink组件在我们组的开落地
>
>
> 再次感谢各位的回复!
>
> 
> 发件人: Jark Wu 
> 发送时间: 2020年7月22日 16:33:45
> 收件人: user-zh
> 抄送: godfrey he; greemqq...@163.com; 刘首维
> 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
>
> Hi,首维,
>
> 非常感谢反馈。与 DataStream 解耦是 FLIP-95 的一个非常重要的设计目标,这让 sink/source 对于框架来说不再是黑盒,
> 因此将来才可以做诸如 state 兼容升级、消息顺序保障、自动并发设置等等事情。
>
> 关于你的一些需求,下面是我的建议和回复:
>
> >  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
> 这个理论上还属于“数据格式”的职责,所以建议做在 DeserializationSchema 上,目前 DeserializationSchema
> 支持一对多的输出。可以参考 DebeziumJsonDeserializationSchema 的实现。
>
> >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
> 这个我觉得也可以封装在 SinkFunction 里面。
>
> >  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
> 这个社区也有计划在 FLIP-95 之上支持,会提供并发(或者分区)推断的能力。
>
> >  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> 这个能在具体一点吗?目前像 SupportsPartitioning 接口,就可以指定数据在交给 sink 之前先做 group by
> partition。我感觉这个可能也可以通过引入类似的接口解决。
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 16:27, Leonard Xu  xbjt...@gmail.com>> wrote:
> Hi,首维, Ran
>
> 感谢分享, 我理解1.11新的API主要是想把 Table API 和 DataStream API 两套尽量拆分干净,
> 但看起来平台级的开发工作会依赖DataStream的一些预处理和用户逻辑。
> 我觉得这类需求对平台开发是合理,可以收集反馈下的, cc: godfrey
>
> 祝好
> Leonard Xu
>
>
> > 在 2020年7月22日,13:47,刘首维  liushou...@autohome.com.cn>> 写道:
> >
> > Hi JingSong,
> >
> >
> 简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL
> SDK
> >  下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子
> >
> >
> >  1.  我们现在有某种特定的Kafka数据格式,1条Kafka数据
> 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。
> >  2.  我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
> >  3.  调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
> >  4.  对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的
> >
> >
> > 如果可以的话,能让我在API层面拿到Transformation也是能满足我需求的
> >
> >
> > 
> > 发件人: Jingsong Li mailto:jingsongl...@gmail.com>>
> > 发送时间: 2020年7月22日 13:26:00
> > 收件人: user-zh
> > 抄送: imj...@gmail.com<mailto:imj...@gmail.com>
> > 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题
> >
> > 可以分享下你们为啥要拿到DataStream吗?什么场景一定离不开DataStream吗?
> >
> > Best
> > Jingsong
> >
> > On Wed, Jul 22, 2020 at 12:36 PM 刘首维  liushou...@autohome.com.cn>> wrote:
> >
> >> Hi all,
> >>
> >>
> >>
> >>很高兴看到Flink 1.11的发布,FLIP95和FLIP105也成功落地~
> >>
> >>我们最近在调研基于1.11的SQL/Table API对我们旧有的SQL
> >>
> SDK进行升级。经过初步调研发现,基于`Factory`和`DynamicTable`的API,CatalogTable会被Planner直接变成Transformation,而不是跟之前一样可以拿到DataStream。比如之前的`StreamTableSource`#getDataStream,我可以直接获得DataStream对象的。这个动作/方法对于我们很重要,因为我们封装的SDK中,很多内部操作是在这个方法调用时触发/完成的。
> >>
> >>
> >>
> >>所以,如果基于新的API去开发的话,我该如何获取到DataStream,或者达成类似的效果呢(尽可能不去动执行计划的话)
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>
>


Re: Flink catalog的几个疑问

2020-07-21 文章 godfrey he
hi Xingxing,

1. Flink 提供了一套catalog的接口,提提供了几个内置的实现:in-memory catalog, hive catalog,
postgres catalog,
可以根据自己的需求选择。也可以实现自定义的catalog。参考 [1]
2. hive catalog 主要是对接 hive,方便读取现有的hive catalog的meta信息。当然也可以往hive
catalog写新的meta。
是否会转为默认catalog,据我所知,目前没有。
3. 一般没什么问题。在和其他区分大小写的db对接的时候,可能有问题。

Best,
Godfrey

dixingxin...@163.com  于2020年7月21日周二 下午11:30写道:

> Hi Flink社区:
> 有几个疑问希望社区小伙伴们帮忙解答一下:
>
> 1.个人感觉Flink很有必要提供一个官方的catalog,用来支持各种connector,比如:kafka,jdbc,hbase等等connector。不知道社区有没有这个打算,目前没有看到对应的flip
> 2.社区对hive catalog的定位是什么,后续有可能转正为flink 默认的catalog实现吗?
> 3.hive catalog是不支持大小写敏感的(字段名都是小写),这个后续会带来哪些问题?想征集下大家的意见避免我们以后踩大坑。
>
>
>
>
> Best,
> Xingxing Di
>


Re: [sql-client] 如何绕过交互式模式去做ddl

2020-07-21 文章 godfrey he
sql-client.sh的-u是指update语句,目前只支持insert。

Jark Wu  于2020年7月21日周二 下午6:47写道:

> Hi,
>
> 你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。
> 社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。
> 不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828.
>
> Best,
> Jark
>
> On Thu, 16 Jul 2020 at 19:43, Harold.Miao  wrote:
>
> > hi flink users
> >
> > 众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,
> > 这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


Re: flink1.11 tablefunction

2020-07-21 文章 godfrey he
可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide


Dream-底限  于2020年7月21日周二 下午7:25写道:

> hi
>
> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink
> 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)
>


Re: flink table同时作为写出及输入时下游无数据

2020-07-21 文章 godfrey he
你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据。

另外,你可以把query 改为都读取 kafka_source_tab 再分别写到两个不同的sink:

sql1='''Insert into g_sink_unit select alarm_id,trck_id from
kafka_source_tab'''
sql2='''Insert into g_summary_base select alarm_id,trck_id from
kafka_source_tab;'''

小学生 <201782...@qq.com> 于2020年7月21日周二 下午5:47写道:

>
> 各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。
>
>
>
>
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>
>
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> env.set_parallelism(1)
> env_settings =
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> t_env = StreamTableEnvironment.create(env,
> environment_settings=env_settings)
>
>
>
> kafka_source_ddl = """
> CREATE TABLE kafka_source_tab (
>  id VARCHAR,   
>  alarm_id VARCHAR,   
>  trck_id VARCHAR
>
>
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'gg',   
>  'scan.startup.mode' = 'specific-offsets', 
>  'scan.startup.specific-offsets'='partition:1,offset:0',
>  'properties.bootstrap.servers' = '',
>  'format' = 'json'
> )
> """
> g_unit_sink_ddl = """
> CREATE TABLE g_sink_unit (
>  alarm_id VARCHAR,   
>  trck_id VARCHAR
>  
> ) WITH (
>  'connector' = 'jdbc',
>  'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
>  'table-name' = 'g_unit',   
>  'username' = 'root',
>  'password' = 'root',
>  'sink.buffer-flush.interval' = '1s'     
> )
> """
> g_summary_ddl = """
> CREATE TABLE g_summary_base(
>  alarm_id VARCHAR,   
>  trck_id VARCHAR
> ) WITH (
>  'connector' = 'jdbc',
>  'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
>  'table-name' = 'g_summary', 
>  'username' = 'root',
>  'password' = 'root',
>  'sink.buffer-flush.interval' = '1s'
> )
> """
>
> t_env.execute_sql(kafka_source_ddl)
> t_env.execute_sql(g_unit_sink_ddl)
> t_env.execute_sql(g_summary_ddl)
>
>
> sql1='''Insert into g_sink_unit select alarm_id,trck_id from
> kafka_source_tab'''
> sql2='''Insert into g_summary_base select alarm_id,trck_id from
> g_sink_unit '''
>
>
>
> stmt_set = t_env.create_statement_set()
> stmt_set.add_insert_sql(sql1)
> stmt_set.add_insert_sql(sql2)
>
>
> stmt_set.execute().get_job_client().get_job_execution_result().result()


Re: flink 1.11 cdc: kafka中存了canal-json格式的多张表信息,需要按表解析做处理,sink至不同的下游,要怎么支持?

2020-07-21 文章 godfrey he
http://apache-flink.147419.n8.nabble.com/flink-1-10-sql-kafka-format-json-schema-json-object-tt4665.html
 这个邮件里提到了类似的问题。

https://issues.apache.org/jira/browse/FLINK-18002 这个issue完成后(1.12),你可以将
“data”,“mysqlType”等格式不确定的字段定义为String类型,
下游通过udf自己再解析对应的json


Best,
Godfrey

jindy_liu <286729...@qq.com> 于2020年7月21日周二 下午12:37写道:

> 例如:
>
> mysql表:
> CREATE TABLE `test` (
>   `id` int(11) NOT NULL,
>   `name` varchar(255) NOT NULL,
>   `time` datetime NOT NULL,
>   `status` int(11) NOT NULL,
>   PRIMARY KEY (`id`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
>
> CREATE TABLE `status` (
>   `id` int(11) NOT NULL,
>   `name` varchar(255) NOT NULL,
>   PRIMARY KEY (`id`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
>
> kafka中数据:
> // 表test 中insert事件
> {"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03
>
> 18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"}
>
> //表status 中的事件
>
> {"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"}
>
> 如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData,
> 感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: SQL 报错只有 flink runtime 的 NPE

2020-07-20 文章 godfrey he
看不到图片信息,换一个图床工具上传图片吧

Luan Cooper  于2020年7月17日周五 下午4:11写道:

> 附一个 Job Graph 信息,在 Cal 处挂了
> [image: image.png]
>
> On Fri, Jul 17, 2020 at 4:01 PM Luan Cooper  wrote:
>
>> 实际有 20 左右个字段,用到的 UDF 有 COALESCE / CAST / JSON_PATH / TIMESTAMP 类
>> *是指 UDF 返回了 NULL 导致的吗?*
>>
>>
>> On Fri, Jul 17, 2020 at 2:54 PM godfrey he  wrote:
>>
>>> udf_xxx的逻辑是啥?
>>>
>>>
>>> Luan Cooper  于2020年7月17日周五 下午2:40写道:
>>>
>>> > Hi
>>> >
>>> > 我有这么一个 SQL
>>> > INSERT INTO es
>>> > SELECT
>>> > a,
>>> > udf_xxx(b)
>>> > FROM mongo_oplog -- 自定义 TableFactory
>>> >
>>> > Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码
>>> Exception,可以稳定重现
>>> >
>>> > LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1)
>>> > (bdf9b131f82a8ebc440165b82b89e570) switched from RUNNING to FAILED.
>>> >
>>> > java.lang.NullPointerException
>>> >
>>> > at StreamExecCalc$8016.split$7938$(Unknown Source)
>>> >
>>> > at StreamExecCalc$8016.processElement(Unknown Source)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>>> >
>>> > at
>>> > org.apache.flink.streaming.runtime.io
>>> > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>>> >
>>> > at
>>> > org.apache.flink.streaming.runtime.io
>>> > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>>> >
>>> > at
>>> > org.apache.flink.streaming.runtime.io
>>> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>>> >
>>> > at
>>> >
>>> >
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>>> >
>>> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>> >
>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>> >
>>> > at java.lang.Thread.run(Thread.java:748)
>>> >
>>> > 请问这种怎样情况排查问题?
>>> > 有任何线索都可以
>>> >
>>> > 感谢
>>> >
>>>
>>


Re: flink sink到kafka

2020-07-19 文章 godfrey he
如果你是用flink sql的,可以通过DDL的方式来定义kafka sink,参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

Best,
Godfrey

smq <374060...@qq.com> 于2020年7月19日周日 下午9:36写道:

> 大家好,我想通过avro格式sink到kafka,请问该怎么实现,官网上没找到相关方法。


Re: FlinkSQL 任务提交后 任务名称问题

2020-07-18 文章 godfrey he
hi Evan,
感谢反馈,目前已经有一个issue [1]在跟踪该问题,可以关注后续进展

[1] https://issues.apache.org/jira/browse/FLINK-18545


Best,
Godfrey

Jeff Zhang  于2020年7月18日周六 下午9:52写道:

> 在zeppelin中你可以指定insert 语句的job name,如下图,(对Zeppelin感兴趣的,可以加入钉钉群:32803524)
>
> %flink.ssql(jobName="my job")
>
> insert into sink_kafka select status, direction, cast(event_ts/10
> as timestamp(3)) from source_kafka where status <> 'foo'
>
> [image: image.png]
>
> Evan  于2020年7月18日周六 下午5:47写道:
>
>> 代码大概是这样子的,一张kafka source表,一张es Sink表,最后通过tableEnv.executeSql("insert into
>> esSinkTable select ... from kafkaSourceTable")执行
>> 任务提交后任务名称为“inset-into_某某catalog_某某database.某某Table”
>>
>>
>> 这样很不友好啊,能不能我自己指定任务名称呢?
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Re: Re:Re: flink 1.11任务提交的问题

2020-07-17 文章 godfrey he
是的。目前按照你的写法做不到只提交一个job了

sunfulin  于2020年7月17日周五 下午3:11写道:

>
>
>
> hi,
> 再问下,这个方案还是会提交两个job吧?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-17 14:36:19,"godfrey he"  写道:
> >做不到,1.11里把 StreamExecutionEnvironment.execute 和
> >StreamTableEnvironment.execute 的逻辑已经切分干净了。
> >有个改动比较小的方案可以参考:可以在原来的逻辑的基础上,把两种提交job的方式放到两个不同的类中,其他的逻辑放到另外一个类共性。
> >
> >sunfulin  于2020年7月17日周五 下午2:00写道:
> >
> >> hi,
> >> 补充一下,1.10版本的代码使用sqlUpdate +
> >>
> table2datastream,并通过StreamExecutionEnvironment.execute来提交。我回滚到1.10版本的代码后,因为我看1.11版本里如果使用sqlUpdate执行insertInto,必须使用StreamTableEnvironment.execute来提交。现在我的问题就是这个:我想通过一个job来提交。现在有机制可以做不?在1.11版本里执行。因为之前的job逻辑较为复杂,做拆分的话还有点麻烦。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-17 13:55:21,"sunfulin"  写道:
> >>
> >>
> >>
> >>
> >> hi,
> >>
> 感谢回复。这个机制我理解了。想了解下,有办法在1.11里仍然使用1.10版本的作业提交机制么?我现在虽然把代码回滚到1.10版本的逻辑,但是提交作业仍然有问题:比如我如果不执行env.execute,那么table
> >> to DataStream的语句不会生成拓扑。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-17 12:09:20,"godfrey he"  写道:
> >> >hi sunfulin,
> >> >目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
> >> >即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
> >> >虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。
> >> >
> >> >Best,
> >> >Godfrey
> >> >
> >> >Leonard Xu  于2020年7月17日周五 上午12:12写道:
> >> >
> >> >> Hi,
> >> >>
> >> >> 我理解目前好像做不到, cc: godfrey 大佬看看
> >> >>
> >> >> 祝好,
> >> >> Leonard Xu
> >> >>
> >> >> > 在 2020年7月16日,23:08,sunfulin  写道:
> >> >> >
> >> >> > hi,
> >> >> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
> >> >> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
> >> >> > 通过StreamExecutionEnvironment.execute提交,yarn
> >> >> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
> >> >>
> >> >>
> >>
> >>
> >>
> >>
> >>
> >>
>


Re: flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function

2020-07-17 文章 godfrey he
第二个问题的异常栈是啥?

kcz <573693...@qq.com> 于2020年7月17日周五 下午2:17写道:

> 第一个bug提示只需要
> classloader.resolve-order: parent-first
> 第二个bug采用了parquet还没解决
>
>
> -- 原始邮件 --
> 发件人:
>   "kcz"
> <
> 573693...@qq.com>;
> 发送时间: 2020年7月17日(星期五) 中午1:32
> 收件人: "user-zh"
> 主题: flink-1.11 DDL 写入hdfs问题 Cannot instantiate user function
>
>
>
> standalone
> lib  jar包如下
> flink-connector-hive_2.11-1.11.0.jar       
> flink-json-1.11.0.jar             
>          
> flink-sql-connector-kafka_2.12-1.11.0.jar  log4j-api-2.12.1.jar
> flink-csv-1.11.0.jar             
>           flink-parquet_2.11-1.11.0.jar 
>              
> flink-table_2.11-1.11.0.jar             
>   log4j-core-2.12.1.jar
> flink-dist_2.11-1.11.0.jar             
>     flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar 
> flink-table-blink_2.11-1.11.0.jar         
> log4j-slf4j-impl-2.12.1.jar
> flink-hadoop-compatibility_2.11-1.11.0.jar 
> flink-shaded-zookeeper-3.4.14.jar           
> log4j-1.2-api-2.12.1.jar
>
>
>
>
>
> 代码如下:idea下不报错
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> env.setParallelism(1);
> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
> // 同一时间只允许进行一个检查点
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);;
> env.setStateBackend(new FsStateBackend(path));
>
> tableEnv.executeSql("CREATE TABLE source_table (\n" +
> "\thost STRING,\n" +
> "\turl STRING,\n" +
> "\tpublic_date STRING\n" +
> ") WITH (\n" +
> "\t'connector.type' = 'kafka',\n" +
> "\t'connector.version' = 'universal',\n" +
> "\t'connector.startup-mode' = 'latest-offset',\n" +
> "\t'connector.topic' = 'test_flink_1.11',\n" +
> "\t'connector.properties.group.id' = 'domain_testGroup',\n" +
> "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n"
> +
> "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n"
> +
> "\t'update-mode' = 'append',\n" +
> "\t'format.type' = 'json',\n" +
> "\t'format.derive-schema' = 'true'\n" +
> ")");
>
> tableEnv.executeSql("CREATE TABLE fs_table (\n" +
> "  host STRING,\n" +
> "  url STRING,\n" +
> "  public_date STRING\n" +
> ") PARTITIONED BY (public_date) WITH (\n" +
> "  'connector'='filesystem',\n" +
> "  'path'='path',\n" +
> "  'format'='json',\n" +
> "  'sink.partition-commit.delay'='0s',\n" +
> "  'sink.partition-commit.policy.kind'='success-file'\n" +
> ")");
>
> tableEnv.executeSql("INSERT INTO  fs_table SELECT host, url,
> DATE_FORMAT(public_date, '-MM-dd') FROM source_table");
> TableResult result = tableEnv.executeSql("SELECT * FROM fs_table ");
> result.print();
> 报错如下
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
>     at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>     at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.apache.commons.collections.map.LinkedMap to field
>
>
> 第二个bug
> sink到hdfs时候,采用parquet时候,lib下面有parquet包,pom里面是provided,但是会提示这个error,也试过pom里面不是provided,还是不OK


Re: SQL 报错只有 flink runtime 的 NPE

2020-07-16 文章 godfrey he
udf_xxx的逻辑是啥?


Luan Cooper  于2020年7月17日周五 下午2:40写道:

> Hi
>
> 我有这么一个 SQL
> INSERT INTO es
> SELECT
> a,
> udf_xxx(b)
> FROM mongo_oplog -- 自定义 TableFactory
>
> Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码 Exception,可以稳定重现
>
> LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1)
> (bdf9b131f82a8ebc440165b82b89e570) switched from RUNNING to FAILED.
>
> java.lang.NullPointerException
>
> at StreamExecCalc$8016.split$7938$(Unknown Source)
>
> at StreamExecCalc$8016.processElement(Unknown Source)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>
> at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>
> at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>
> at
> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>
> at java.lang.Thread.run(Thread.java:748)
>
> 请问这种怎样情况排查问题?
> 有任何线索都可以
>
> 感谢
>


Re: Re:Re: flink 1.11任务提交的问题

2020-07-16 文章 godfrey he
做不到,1.11里把 StreamExecutionEnvironment.execute 和
StreamTableEnvironment.execute 的逻辑已经切分干净了。
有个改动比较小的方案可以参考:可以在原来的逻辑的基础上,把两种提交job的方式放到两个不同的类中,其他的逻辑放到另外一个类共性。

sunfulin  于2020年7月17日周五 下午2:00写道:

> hi,
> 补充一下,1.10版本的代码使用sqlUpdate +
> table2datastream,并通过StreamExecutionEnvironment.execute来提交。我回滚到1.10版本的代码后,因为我看1.11版本里如果使用sqlUpdate执行insertInto,必须使用StreamTableEnvironment.execute来提交。现在我的问题就是这个:我想通过一个job来提交。现在有机制可以做不?在1.11版本里执行。因为之前的job逻辑较为复杂,做拆分的话还有点麻烦。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-17 13:55:21,"sunfulin"  写道:
>
>
>
>
> hi,
> 感谢回复。这个机制我理解了。想了解下,有办法在1.11里仍然使用1.10版本的作业提交机制么?我现在虽然把代码回滚到1.10版本的逻辑,但是提交作业仍然有问题:比如我如果不执行env.execute,那么table
> to DataStream的语句不会生成拓扑。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-17 12:09:20,"godfrey he"  写道:
> >hi sunfulin,
> >目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
> >即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
> >虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。
> >
> >Best,
> >Godfrey
> >
> >Leonard Xu  于2020年7月17日周五 上午12:12写道:
> >
> >> Hi,
> >>
> >> 我理解目前好像做不到, cc: godfrey 大佬看看
> >>
> >> 祝好,
> >> Leonard Xu
> >>
> >> > 在 2020年7月16日,23:08,sunfulin  写道:
> >> >
> >> > hi,
> >> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
> >> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
> >> > 通过StreamExecutionEnvironment.execute提交,yarn
> >> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
> >>
> >>
>
>
>
>
>
>


Re: flink 1.11任务提交的问题

2020-07-16 文章 godfrey he
hi sunfulin,
目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。

Best,
Godfrey

Leonard Xu  于2020年7月17日周五 上午12:12写道:

> Hi,
>
> 我理解目前好像做不到, cc: godfrey 大佬看看
>
> 祝好,
> Leonard Xu
>
> > 在 2020年7月16日,23:08,sunfulin  写道:
> >
> > hi,
> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
> > 通过StreamExecutionEnvironment.execute提交,yarn
> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
>
>


Re: flink 1.11 checkpoint使用

2020-07-16 文章 godfrey he
为什么要 GROUP BY id,name ,description, weight ?
直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
debezium_source" 不能满足需求?

曹武 <14701319...@163.com> 于2020年7月16日周四 下午9:30写道:

> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
> 从checkpoint恢复以后,新来op=d的数据会删除失败
> 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
>
> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
> 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
> 最大允许同时出现几个CheckPoint
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
> 最小得间隔时间
> env.getCheckpointConfig().setPreferCheckpointForRecovery(true); //
> 是否倾向于用CheckPoint做故障恢复
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
> //
> 容忍多少次CheckPoint失败
> //Checkpoint文件清理策略
>
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> //Checkpoint外部文件路径
> env.setStateBackend(new FsStateBackend(new
> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> settings);
> String sourceDDL = String.format(
> "CREATE TABLE debezium_source (" +
> " id INT NOT NULL," +
> " name STRING," +
> " description STRING," +
> " weight Double" +
> ") WITH (" +
> " 'connector' = 'kafka-0.11'," +
> " 'topic' = '%s'," +
> " 'properties.bootstrap.servers' = '%s'," +
> " 'scan.startup.mode' = 'group-offsets'," +
> " 'format' = 'debezium-json'" +
> ")", "ddd", " 172.22.20.206:9092");
> String sinkDDL = "CREATE TABLE sink (" +
> " id INT NOT NULL," +
> " name STRING," +
> " description STRING," +
> " weight Double," +
> " PRIMARY KEY (id,name, description,weight) NOT ENFORCED "
> +
> ") WITH (" +
> " 'connector' = 'jdbc'," +
> " 'url' =
> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
> " 'table-name' = 'products'," +
> " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
> " 'username'='DataPip'," +
> " 'password'='DataPip'" +
> ")";
> String dml = "INSERT INTO sink SELECT  id,name ,description, weight
> FROM debezium_source GROUP BY id,name ,description, weight";
> tEnv.executeSql(sourceDDL);
> tEnv.executeSql(sinkDDL);
> tEnv.executeSql(dml);
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 文章 godfrey he
通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors

Best,
Godfrey

wangl...@geekplus.com.cn  于2020年7月16日周四 下午4:02写道:

> 比如:
>
> CREATE TABLE my_table (
>   id BIGINT,
>  first_name STRING,
>  last_name STRING,
>  email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
>
> 最终解析 debezium-json 应该是
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> 下面的代码
> 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>


Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-15 文章 godfrey he
现在还不支持在sql-client-defaults.yaml 里配置 checkpointing.interval,
你可以配置在flink-conf.yaml里

Harold.Miao  于2020年7月16日周四 下午1:27写道:

> hi flink users
>
> 通过sql-client提交sql怎么设置checkpointing.interval?
> 我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
> 谢谢
>
>
>
> --
>
> Best Regards,
> Harold Miao
>


Re: Flink-1.11内置connector测试问题求解

2020-07-15 文章 godfrey he
目前 1.11 版本中的 tableResult.print 只支持 exactly once 语义,需要配置 checkpoint。

1.12 里准备支持 at least once 语义,用户可以不用配置 checkpoint。目前 pr [1] 正在reivew 。

[1] https://github.com/apache/flink/pull/12867

Best,
Godfrey

Jingsong Li  于2020年7月16日周四 上午11:36写道:

>  tableResult.print需要有checkpoint
>
> Best,
> Jingsong
>
> On Thu, Jul 16, 2020 at 11:31 AM amen...@163.com  wrote:
>
> > hi, everyone
> >
> > 小白在测试flink
> >
> 1.11新特性新内置的三个connector时,在本地创建图片[1]中的任务并进行数据打印时,控制台只打印了表schema,而没有按内置的datagen
> > connector规则产生数据,请问可能是什么原因呢?谢谢解答!
> >
> >
> > [1] https://postimg.cc/PprT9XV6
> >
> > best,
> > amenhub
> >
> >
> >
> > amen...@163.com
> >
>
>
> --
> Best, Jingsong Lee
>


Re: flink 1.11运算结果存mysql出错

2020-07-13 文章 godfrey he
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()

TableEnvironment.executeSql() 和 StatementSet.execute()
提交的作业都是异步的,如果是在本地测试的话,不会等有最终结果才会推出。针对这个问题,1.12里准备引入 await 方法
[3],代码还在review中。

TableResult是用来描述一个statement执行的结果。对于SELECT和INSERT,TableResult中还包含了JobClient
[4]
用来操作对应的job,例如获取job状态,cancel作业,等待作业结束等。TableResult还可以collect方法拿到statement执行的schema和结果数据,例如
select/show的结果。


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset
[3] https://issues.apache.org/jira/browse/FLINK-18337
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API

Best,
Godfrey


小学生 <201782...@qq.com> 于2020年7月13日周一 下午9:12写道:

> 嗯嗯,尝试了,这下没问题了,想问下这个TableResult对象,设计的目的是啥呢,不是特别懂呢,谢谢!
>


Re: flink 1.11 sql作业提交JM报错

2020-07-12 文章 godfrey he
hi sunfulin,

1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()

详细可以参考 [1] [2]



对于 “No operators defined in streaming topology.”,如果使用
TableEnvironment.executeSql() 或者 StatementSet.execute() 方法提交的作业后再调用
StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()
提交作业,就会出现前面的错误。

对于
“是不是不推荐在作业里同时使用executeSQL和StatementSet.execute?”,这个答案是no。executeSql和StatementSet不会相互干扰。对于出现的错误,能给一个更详细的提交作业的流程描述吗?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset

Best,
Godfrey

Leonard Xu  于2020年7月12日周日 下午1:48写道:

> HI, fulin
>
> 能大致贴下代码吗?能复现异常即可。简单说下这两个方法,
>  TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是
> DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink
> job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …)
> 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。
> Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法,
> 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert
> tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”),
> TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job,
> 这应该不是用户需要的。
> 具体使用根据你的需要来使用。
>
>
> Best,
> Leonard Xu
>
>
> 在 2020年7月11日,22:24,sunfulin  写道:
>
> statementset.execute
>
>
>


Re: 代码中如何取消正在运行的Flink Streaming作业

2020-07-08 文章 godfrey he
可以通过 StreamExecutionEnvironment#executeAsync 提交作业,返回 JobClient [1], 通过
JobClient 可以 cancel 作业,获取 job status。

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API

Best,
Godfrey

Evan  于2020年7月9日周四 上午9:40写道:

> 这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming
> API有没有提供类似的接口,调用后就能停止这个Stream作业呢?


Re: Flink SQL如何将多个表的查询结果(列不同)聚合成一张表

2020-07-08 文章 godfrey he
select a.table_tmp1.r1 / a.table_tmp2.r2
这个是对同一行的数据进行操作,所以你需要先对table_tmp1和table_tmp2做一个join,将两个表的数据根据条件合并成一张表。


zilong xiao  于2020年7月8日周三 下午8:55写道:

> 列如下面这样,需要查询table1 & table2,分别查询不同的字段
> 在最外层做比值,flink貌似语法检查不通过,应该怎么写这样的SQL呢,有前辈可以指导下不~
> select a.table_tmp1.r1 / a.table_tmp2.r2 as value0 from
> (
> (SELECT r1 FROM table1) AS table_tmp1, (SELECT r2 FROM table2) AS
> table_tmp2,
> )as a
>


Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-08 文章 godfrey he
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()

详细可以参考 [1] [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset

Best,
Godfrey

Zhou Zach  于2020年7月8日周三 下午4:19写道:

> 去掉就好了,感谢解答
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-08 16:07:17,"Jingsong Li"  写道:
> >Hi,
> >
> >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
> >
> >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
> >并没有真正的物理节点。你不用再调用了。
> >
> >Best,
> >Jingsong
> >
> >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach  wrote:
> >
> >>
> >>
> >>
> >> 代码结构改成这样的了:
> >>
> >>
> >>
> >>
> >> val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
> >>
> >> val blinkEnvSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >>
> >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> >> blinkEnvSettings)
> >>
> >>
> >>
> >>
> >>
> >> streamExecutionEnv.execute("from kafka sink hbase")
> >>
> >>
> >>
> >>
> >> 还是报一样的错
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-08 15:40:41,"夏帅"  写道:
> >> >你好,
> >> >可以看看你的代码结构是不是以下这种
> >> >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> >> >val bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
> >> >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
> >> >  ..
> >> >tableEnv.execute("")
> >> >如果是的话,可以尝试使用bsEnv.execute("")
> >> >1.11对于两者的execute代码实现有改动
> >> >
> >> >
> >> >--
> >> >发件人:Zhou Zach 
> >> >发送时间:2020年7月8日(星期三) 15:30
> >> >收件人:Flink user-zh mailing list 
> >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming
> topology
> >> >
> >> >代码在flink
> >>
> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
> >> >Exception in thread "main" java.lang.IllegalStateException: No
> operators
> >> defined in streaming topology. Cannot generate StreamGraph.
> >> >at
> >>
> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
> >> >at
> >>
> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
> >> >at
> >>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
> >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
> >> >
> >> >
> >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
> >> >
> >> >
> >> >
> >> >
> >> >query:
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|CREATE TABLE `user` (
> >> >|uid BIGINT,
> >> >|sex VARCHAR,
> >> >|age INT,
> >> >|created_time TIMESTAMP(3),
> >> >|WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >> >|) WITH (
> >> >|'connector.type' = 'kafka',
> >> >|'connector.version' = 'universal',
> >> >|-- 'connector.topic' = 'user',
> >> >|'connector.topic' = 'user_long',
> >> >|'connector.startup-mode' = 'latest-offset',
> >> >|'connector.properties.group.id' = 'user_flink',
> >> >|'format.type' = 'json',
> >> >|'format.derive-schema' = 'true'
> >> >|)
> >> >|""".stripMargin)
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|CREATE TABLE user_hbase3(
> >> >|rowkey BIGINT,
> >> >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
> >> >|) WITH (
> >> >|'connector.type' = 'hbase',
> >> >|'connector.version' = '2.1.0',
> >> >|'connector.table-name' = 'user_hbase2',
> >> >|'connector.zookeeper.znode.parent' = '/hbase',
> >> >|'connector.write.buffer-flush.max-size' = '10mb',
> >> >|'connector.write.buffer-flush.max-rows' = '1000',
> >> >|'connector.write.buffer-flush.interval' = '2s'
> >> >|)
> >> >|""".stripMargin)
> >> >
> >> >
> >> >streamTableEnv.executeSql(
> >> >  """
> >> >|
> >> >|insert into user_hbase3
> >> >|SELECT uid,
> >> >|
> >> >|  ROW(sex, age, created_time ) as cf
> >> >|  FROM  (se

Re: pyflink数据查询

2020-06-15 文章 godfrey he
hi jack,jincheng

Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
   it.next() 
}

但是 pyflink 还没有引入 collect() 接口。(后续会完善?@jincheng)

但是1.11的TableResult#collect实现对流的query支持不完整(只支持append
only的query),master已经完整支持。

可以参照 jincheng 的意见,(或者结合 TableResult#collect 的实现),完成一个自己的 sink 也可以。

Best,
Godfrey



jincheng sun  于2020年6月15日周一 下午4:14写道:

> 你好 Jack,
>
> >  pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
> 我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询
>
> 我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
> 1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
> 2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的
> 【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
>
> 如果上面回复 没有解决你的问题,欢迎随时反馈~~
>
> Best,
> Jincheng
>
>
>
> Jeff Zhang  于2020年6月9日周二 下午5:39写道:
>
>> 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
>> https://www.bilibili.com/video/BV1Te411W73b?p=20
>> 可以加入钉钉群讨论:30022475
>>
>>
>>
>> jack  于2020年6月9日周二 下午5:28写道:
>>
>>> 问题请教:
>>> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
>>> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>>>
>>> flink能否实现这样的方式?
>>> 感谢
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: flink sql字段重命名问题

2020-06-12 文章 godfrey he
hi,如 benchao 所说,SELECT XX AS YY 和Table API的renameColumns等价。
而且这些名字仅用于sql解析和优化阶段,实际执行的时候不会使用字段名。

Best,
Godfrey

Benchao Li  于2020年6月12日周五 下午6:36写道:

> 直接用SELECT XX AS YY就等价于Table API的renameColumns了吧。
>
> naisili Yuan  于2020年6月12日周五 下午6:23写道:
>
> > Hi all
> > 想请教下,flink使用sql的时候,字段重命名如何解决呢?看到了table api中的renameColumes接口,flink
> > sql的没有看到明确的接口
> > 我自己试了一下在创建表的sql语句中就加入name_alias  AS
> > name,发现可行,但是这样引入的字段,会不会影响效率,希望能获得解答,谢谢!
> >
>


Re: flink 1.10SQL 报错问题求教

2020-06-10 文章 godfrey he
hi chenkaibit

欢迎将fix贡献回社区


chenkaibit  于2020年6月9日周二 上午10:34写道:

> 我也遇到过这个问题,这个可能是 checkpoint 的 bug,我修改了下 flink 源码后不报错了,可以参考下这个patch
> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
>
>
> 在 2020-06-05 15:06:48,"hb" <343122...@163.com> 写道:
> >Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB),
> >但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环.
> >哪位帮忙看看,不胜感激.
> >
> >
> >2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->
> SourceConversion(table=[default_catalog.default_database.user_visit_trace,
> source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
> fields=[userId, utp, utrp, extendFields, requestTime]) ->
> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime],
> where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")
> AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'MMdd') >
> _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS
> NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)],
> joinType=[InnerJoin], async=[false], lookup=[user_id=userId],
> where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id,
> user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) /
> 1000) FROM_UNIXTIME _UTF-16LE'MMdd') CONCAT
> _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT
> _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) ->
> SinkConversionToTuple2 -> Sink: Unnamed (1/8)
> (ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED.
> >java.lang.Exception: Could not perform checkpoint 401 for operator
> Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->
> SourceConversion(table=[default_catalog.default_database.user_visit_trace,
> source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
> fields=[userId, utp, utrp, extendFields, requestTime]) ->
> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime],
> where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")
> AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'MMdd') >
> _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS
> NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)],
> joinType=[InnerJoin], async=[false], lookup=[user_id=userId],
> where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id,
> user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) /
> 1000) FROM_UNIXTIME _UTF-16LE'MMdd') CONCAT
> _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT
> _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) ->
> SinkConversionToTuple2 -> Sink: Unnamed (1/8).
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown
> Source)
> >at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> >at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> >at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> >at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >at java.lang.Thread.run(Thread.java:745)
> >Caused by: java.lang.NullPointerException
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown
> Source)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> >... 12 more

Re: flink 1.10SQL 报错问题求教

2020-06-07 文章 godfrey he
hi 请问你用的flink是哪个版本?StreamTask这个类里报了NPE,感觉是bug。

hb <343122...@163.com> 于2020年6月5日周五 下午3:07写道:

> Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB),
> 但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环.
> 哪位帮忙看看,不胜感激.
>
>
> 2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task -
> Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->
> SourceConversion(table=[default_catalog.default_database.user_visit_trace,
> source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
> fields=[userId, utp, utrp, extendFields, requestTime]) ->
> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime],
> where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")
> AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'MMdd') >
> _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS
> NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)],
> joinType=[InnerJoin], async=[false], lookup=[user_id=userId],
> where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id,
> user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) /
> 1000) FROM_UNIXTIME _UTF-16LE'MMdd') CONCAT
> _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT
> _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) ->
> SinkConversionToTuple2 -> Sink: Unnamed (1/8)
> (ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED.
> java.lang.Exception: Could not perform checkpoint 401 for operator Source:
> KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->
> SourceConversion(table=[default_catalog.default_database.user_visit_trace,
> source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
> fields=[userId, utp, utrp, extendFields, requestTime]) ->
> Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime],
> where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")
> AND (((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'MMdd') >
> _UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS
> NOT NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)],
> joinType=[InnerJoin], async=[false], lookup=[user_id=userId],
> where=[shop_id IS NOT NULL], select=[item_id, userId, requestTime, shop_id,
> user_id]) -> Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) /
> 1000) FROM_UNIXTIME _UTF-16LE'MMdd') CONCAT
> _UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT
> _UTF-16LE':' CONCAT item_id) AS redisKey, requestTime AS fieldName]) ->
> SinkConversionToTuple2 -> Sink: Unnamed (1/8).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown
> Source)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown
> Source)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> ... 12 more
> 2020-06-05 14:13:19,795 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(userId, utp,
> utrp, extendFields, requestTime) ->
> SourceConversion(table=[

Re: Flink SQL 子查询优化问题

2020-06-04 文章 godfrey he
hi Yichao,

最好的解决方式是在code gen的时候支持表达式复用,后续会慢慢加上这些优化。

Best,
Godfrey

1048262223 <1048262...@qq.com> 于2020年6月4日周四 下午9:52写道:

> Hi
>
>
> 感谢答复,我明天使用下这种方式
> 社区也有同学提供了udf使用udtf代替的方法
> 我会尝试使用上述两种方式实现下
>
>
> 还有想问下之后有对这部分做优化的计划吗?
>
>
> Best,
> Yichao Yang
>
>
>
>
>
> -- 原始邮件 --
> 发件人: godfrey he  发送时间: 2020年6月4日 21:17
> 收件人: user-zh  主题: 回复:Flink SQL 子查询优化问题
>
>
>
> hi Yichao,
>
> 目前 planner 会 try best 的将两个相邻的 project 节点合并在一起,除非两个project被分开。
> 就像你上面的那种做法。但是加一个group by的执行代价比较高。
> 对于blink planner 而言,*有一个绕的办法*,可以在子查询的结果加一个print sink(可以ignore输出),
> 利用多sink的优化特性,将两个project分开,从而阻止优化器将两个project合并。
>
> Best,
> Godfrey
>
>
>
> 1048262223 <1048262...@qq.com> 于2020年6月4日周四 下午4:56写道:
>
> > Hi all
> >
> >
> > 版本说明:
> > Flink 版本:1.10
> > Planner:old planner&nbsp;/ blink planner
> >
> >
> >
> > 程序说明(Flink SQL):
> >
> > source:每隔一秒输出Tuple2.of(1, "{\"name\": \"a\"}");
> >
> >
> >
> >
> > query:select a.id, a.name, a.name from (select id,
> body_json_to_map(name)
> > as name from data) a
> >
> >
> >
> >
> > sink:print
> >
> >
> >
> >
> > udf:body_json_to_map
> >
> > 实现如下
> >
> > public Map >
> > &nbsp; &nbsp; logger.info("a");
> >
> > &nbsp; &nbsp; return JsonUtil.fromJsonToMap(json);
> >
> > }
> >
> >
> >
> >
> > 运行结果:
> >
> > a
> >
> > a
> >
> > 1,{name=a},{name=a}
> >
> > a
> >
> > a
> >
> > 1,{name=a},{name=a}
> >
> > a
> >
> > a
> >
> > 1,{name=a},{name=a}
> >
> >
> >
> >
> > 问题:
> >
> > 子查询其实是为了使用body_json_to_map将 json 只解析一遍减少性能消耗而做的优化
> >
> > 尝试使用了 old planner 和 blink planner 都是如上结果,查看执行计划,实际执行计划如下:
> >
> >
> >
> >
> >
> >
> > select
> >
> id,&nbsp;body_json_to_map(name),&nbsp;body_json_to_map(name)&nbsp;from
> data
> >
> >
> >
> >
> > 目前的解决方案:
> >
> > 做 group by 去把算子强行隔离开
> >
> >
> >
> >
> > 想问下各位大佬这种 flink planner 做的优化有什么其他办法解决吗?
> >
> >
> >
> >
> > Best,
> >
> > Yichao Yang


Re: flink sql upsert模式写入mysql,es等key一定是groupby之后所有字段吗

2020-06-04 文章 godfrey he
hi yihan,
如 Leonard 所说,你可以考虑使用 first_value, last_value 等聚合函数和赛选其他字段。
1.11开始支持ddl定义pk信息, 如果id在source表中也是pk字段,可以直接定义,
planner会利用该信息传递pk到sink表。

Bests,
Godfrey

Leonard Xu  于2020年6月4日周四 下午9:01写道:

> Hi,
>
> > 但这样子不能通过calcite的sqlvalidation,select后面不能有非聚合项,
>
> select后费聚合值可以通过max()或sum()来取,因为已经按照key group by了,所以取出来的非聚合值只能有一条,
>
> > 这样子key的state无限增长(比如说amount是一个随机的double数),job跑不久就会fail掉。
>
> State 可以配置ttl的,过期清理参考[1]
>
> 另外,即将发布的1.11中,支持在jdbc table 上定义primary key, 不用强制要求写upsert 的query,文档正在撰写中[2]
>
> Best,
> Leonard Xu
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time
> >
> [2] https://issues.apache.org/jira/browse/FLINK-17829 <
> https://issues.apache.org/jira/browse/FLINK-17829>
>


Re: 关于flinksql 与维表mysql的关联问题

2020-06-04 文章 godfrey he
hi 可以考虑使用 temporal table join [1]

Best,
Godfrey

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#join-with-a-temporal-table

小屁孩 <932460...@qq.com> 于2020年6月4日周四 下午5:51写道:

> 您的意思是open 全量预加载吗?我目前的逻辑是自己自定义的source 广播出去
> 这是我的source
>
>
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
>
>
> import java.sql.Connection;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import java.util.HashMap;
> import java.util.Map;
>
>
> public class GetMysqlDvcId extends RichSourceFunction Integer>> {
>
>
>     private Connection connection = null;
>     private PreparedStatement ps = null;
>     private volatile boolean isRunning = true;
>
>
>
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>           String database="db_nssa";
>           String host="212.21.12.12";
>           String password="saa!";
>           String port="3306";
>           String username="root";
>
>
>
>
>         String driver = "com.mysql.jdbc.Driver";
>         String url = "jdbc:mysql://" + host + ":" +
> port + "/" + database + "?useUnicode=true&characterEncoding=UTF-8";
>           connection =
> MySQLUtil.getConnection(driver, url, username, password);
>
>
>
>
>         if (this.connection != null) {
>             String sql = "select
> ip,device_id from sys_device";
>             ps
> =connection.prepareStatement(sql);
>         }
>     }
>
>
>     @Override
>     public void run(SourceContext ctx) throws Exception {
>         Map HashMap<>();
>         while (isRunning) {
>             ResultSet resultSet =
> ps.executeQuery();
>             while (resultSet.next()) {
>                    
> map.put(resultSet.getString("ip"),resultSet.getInt("device_id"));
>             }
>            
> System.out.println("===select alarm notify from mysql, size = {}, map =
> {}"+ map.size()+ map);
>             ctx.collect(map);
>             map.clear();
>             Thread.sleep(2000 * 60);
>         }
>
>
>     }
>
>
>
>
>     @Override
>     public void cancel() {
>         try {
>             super.close();
>             if (connection != null) {
>                 connection.close();
>             }
>             if (ps != null) {
>                 ps.close();
>             }
>         } catch (Exception e) {
>            
> System.out.println("runException:{}"+e);
>         }
>         isRunning = false;
>     }
> }
>
> -- 原始邮件 --
> 发件人: "Michael Ran" 发送时间: 2020年6月4日(星期四) 下午5:22
> 收件人: "user-zh"
> 主题: Re:关于flinksql 与维表mysql的关联问题
>
>
>
> 放到open 方法里面可以吗?
> 在 2020-06-04 14:15:05,"小屁孩" <932460...@qq.com> 写道:
> >dear:    我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据


Re: Flink SQL 子查询优化问题

2020-06-04 文章 godfrey he
hi Yichao,

目前 planner 会 try best 的将两个相邻的 project 节点合并在一起,除非两个project被分开。
就像你上面的那种做法。但是加一个group by的执行代价比较高。
对于blink planner 而言,*有一个绕的办法*,可以在子查询的结果加一个print sink(可以ignore输出),
利用多sink的优化特性,将两个project分开,从而阻止优化器将两个project合并。

Best,
Godfrey



1048262223 <1048262...@qq.com> 于2020年6月4日周四 下午4:56写道:

> Hi all
>
>
> 版本说明:
> Flink 版本:1.10
> Planner:old planner / blink planner
>
>
>
> 程序说明(Flink SQL):
>
> source:每隔一秒输出Tuple2.of(1, "{\"name\": \"a\"}");
>
>
>
>
> query:select a.id, a.name, a.name from (select id, body_json_to_map(name)
> as name from data) a
>
>
>
>
> sink:print
>
>
>
>
> udf:body_json_to_map
>
> 实现如下
>
> public Map
>     logger.info("a");
>
>     return JsonUtil.fromJsonToMap(json);
>
> }
>
>
>
>
> 运行结果:
>
> a
>
> a
>
> 1,{name=a},{name=a}
>
> a
>
> a
>
> 1,{name=a},{name=a}
>
> a
>
> a
>
> 1,{name=a},{name=a}
>
>
>
>
> 问题:
>
> 子查询其实是为了使用body_json_to_map将 json 只解析一遍减少性能消耗而做的优化
>
> 尝试使用了 old planner 和 blink planner 都是如上结果,查看执行计划,实际执行计划如下:
>
>
>
>
>
>
> select
> id, body_json_to_map(name), body_json_to_map(name) from data
>
>
>
>
> 目前的解决方案:
>
> 做 group by 去把算子强行隔离开
>
>
>
>
> 想问下各位大佬这种 flink planner 做的优化有什么其他办法解决吗?
>
>
>
>
> Best,
>
> Yichao Yang


Re: 回复: flink数据sink到mysql 是事务处理

2020-06-03 文章 godfrey he
hi greemqq...@163.com,15701181132mr@gmail.com
能详细介绍一下 幂等 处理的场景吗? 例如通过upsert语句能做到幂等

hi hdxg1101300...@163.com,
你的sink function的一个statement batch里有insert,delete等语句混合的情况?是用的是flink
sql,还是datastream?

Bests,
Godfrey

Bests,
Godfrey

Michael Ran  于2020年6月3日周三 下午8:07写道:

> 我们也会用幂等处理类似的东西。
> 1.你要么单条数据处理
> 2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-03 15:41:44,"1048262223" <1048262...@qq.com> 写道:
> >Hi
> >     
> 可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。
> >
> >
> >Best,
> >Yichao Yang
> >
> >
> >
> >
> >-- 原始邮件 --
> >发件人: "Px New"<15701181132mr@gmail.com>;
> >发送时间: 2020年6月3日(星期三) 中午11:35
> >收件人: "user-zh" >
> >主题: Re: flink数据sink到mysql 是事务处理
> >
> >
> >
> >Hi 我最近在处理幂等性写入Mysql 但相关文档太少并没有实质性的操作, 所有方便参观下你这边实务写入的code吗? 非常感谢你
> >也可发code到我的email 15701181132mr@gmail.com
> >
> >1101300123  >
> >>
> >>
> >>
> 目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端
> >> 多条记录的操作
> >> sink的invoke代码
> >> @Override
> >> public void invoke(Tuple5 >> List >> connection.setAutoCommit(false);
> >> List >> for (BroadBandReq rs: f4){
> >> statement.setString(1,rs.getUserId());
> >> statement.setString(2,rs.getPhoneNum());
> >> statement.setString(3,rs.getProvId());
> >> statement.addBatch();
> >> }
> >> try {
> >> statement.executeBatch();
> >> connection.commit();
> >> }catch (Exception e){
> >> LOG.info(" add data for rds ; operTag:{},
> >> userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0,
> value.f1,
> >> value.f2, value.f3,f4);
> >> connection.rollback();
> >> e.printStackTrace();
> >>
> throw new Exception(e);
> >> }
> >> }
> >>
> >>
> >>
> >>
> >> java.lang.Exception: java.sql.BatchUpdateException: Deadlock found
> when
> >> trying to get lock; try restarting transaction
> >> at
> com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73)
> >> at
> com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18)
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718)
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736)
> >> at
> >>
> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102)
> >> at
> >>
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162)
> >> at
> >>
> com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157)
> >> at
> >>
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
> >> at

Re: pyflink window创建失败

2020-06-03 文章 godfrey he
 hi 元灵,
 这个是个已知bug: https://issues.apache.org/jira/browse/FLINK-17753,目前正在fix。

Bests,
Godfrey

元灵  于2020年6月3日周三 下午5:39写道:

> 大家好,请教个问题:
> 我在pyflink中使用SQL DDL创建kafka源,如下:
> kafka_source_ddl = """
> CREATE TABLE kafka_source_tb (
>  name VARCHAR,
>  number INT,
>  msgtime TIMESTAMP,
>  WATERMARK FOR msgtime AS msgtime
> ) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = 'universal',
>  'connector.topic' = 'mytopic',
>  'connector.properties.zookeeper.connect' = 'localhost:2181',
>  'connector.properties.bootstrap.servers' = 'localhost:9092',
>  'format.type' = 'json',
>  'format.derive-schema' = 'true'
> )
> """
> st_env.sql_update(kafka_source_ddl)
>
>
> 在使用窗口时报错,代码如下:
> st_env.from_path("kafka_source_tb") \
>
> .window(Slide.over("10.secends").every("1.secends").on("msgtime").alias("msgtime"))
> \
>   .group_by("msgtime") \
>   .select("msgtime.start as b, msgtime.end as c, msgtime.rowtime
> as d") \
>
>
> 报错如下
> : org.apache.flink.table.api.ValidationException: A group window expects a
> time attribute for grouping in a stream environment.
> at
> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
> at
> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
> at
> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
> at
> org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762)
> at
> org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747)
>
>
> 我看到有人说直接使用api的rowtime函数好像有bug,就选了用DDL的,这个error是否相关?或者是我哪里写错了?
>
>
> 请大家帮忙看一下
> 谢谢!
>
>


Re: flink 1.9 关于回撤流的问题

2020-06-03 文章 godfrey he
hi star,
Flink 1.11 开始已经支持 table source 读取 retract 消息,update 消息。
目前支持 Debezium format,Canal format [1],其他的情况目前需要自己实现。


Best,
Godfrey

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=147427289#FLIP105:SupporttoInterpretChangeloginFlinkSQL(IntroducingDebeziumandCanalFormat)-CanalFormat


1048262223 <1048262...@qq.com> 于2020年6月3日周三 下午2:59写道:

> Hi
> Flink 中RetractStream
> 是必须要sink支持update的,kafka消息队列本身是不支持update的,所以基于sink为kafka的程序是不能做RetractStream的。
>
>
> Best,
> Yichao Yang
>
>
> -- 原始邮件 --
> 发件人: "star"<3149768...@qq.com>;
> 发送时间: 2020年6月3日(星期三) 下午2:47
> 收件人: "user-zh@flink.apache.org"
> 主题: flink 1.9 关于回撤流的问题
>
>
>
> 大家好,
>
>
>
> 在做实时统计分析的时候我将基础汇总层做成了一个回撤流(toRetractStream)输出到kafka里(因为后面很多实时报表依赖这个流,所以就输出了)
> 问题是这个kafka里到回撤流还能转成flink 的RetractStream流吗,转成后我想注册成一张表,然后基于这个表再做聚合分析
>
>
>
>
> 谢谢


Re: Flink 有支持类似MR一样支持opts参数的变量替换么?

2020-06-02 文章 godfrey he
hi shangwen, flink
支持配置 env.java.opts,env.java.opts.jobmanager,env.java.opts.taskmanager
等来配置JVM
详细请见:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jvm-and-logging-options

Best,
Godfrey


shangwen <583767...@qq.com> 于2020年6月2日周二 下午9:03写道:

> 现在一个场景是我们假设Flink的TM出现了OOM,并希望收集Dump文件,配置HeapDumpOnOutOfMemoryError,让Flink出现oom时将文件dump出来,对于MR来说,我们可以这么配置mapreduce.map.java.opts为-XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath= ,让收集的文件可以区分,目前对于Flink来说,有较好的方式能够做到这一点吗?


Re: Flink sql client 连接使用kerberos 认证的hive的问题

2020-06-02 文章 godfrey he
hi john,请问你用sql client跑的是yarn per job 还是 yarn session 模式?
security 相关的配置需要放到 flink-conf.yaml 中,是因为 sql client不负责启动一个flink cluster,
只负责提交sql job。

Best,
Godfrey

john <506269...@qq.com> 于2020年5月29日周五 下午12:09写道:

> flink version: 1.10.1
> Hive versos: 2.11
> flink-conf.yaml 安全配置:
> # security
> security.kerberos.login.use-ticket-cache: true
> security.kerberos.login.keytab: /tmp/bigdata.keytab
> security.kerberos.login.principal: bigdata 
>
> 问题描述:
> 使用bin/yarn-session.sh提交一个session集群在log中可以看到是使用bigdata登录成功。
> 2020-05-29 11:45:05,833 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: yarn.application-attempts, 4
> 2020-05-29 11:45:05,833 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: security.kerberos.login.use-ticket-cache, true
> 2020-05-29 11:45:05,833 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: security.kerberos.login.keytab, /tmp/bigdata.keytab
> 2020-05-29 11:45:05,834 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: security.kerberos.login.principal,
> bigd...@aibee.cn
> 2020-05-29 11:45:06,576 INFO
>  org.apache.hadoop.security.UserGroupInformation   - Login
> successful for user bigdata using keytab file /tmp/bigdata.keytab
> 2020-05-29 11:45:06,577 INFO
>  org.apache.flink.runtime.security.modules.HadoopModule- Hadoop
> user set to bigdata(auth:KERBEROS), credentials check status: true
>
> 但是:
> 使用sql-client.sh连接hive提交一个简单查询的时候,貌似没有读取到flink-conf.yaml里的securit配置。
> 即:还是使用我shell端的登录用户。
>
>
> 不知道sql-client-defaults.yaml如何配置才能正确的读取flink-conf.yaml,并使用这个配置文体,往yarn上提交作业。非常感谢!
>


Re: Flink1.11-release编译部署后sql-client的bug

2020-06-01 文章 godfrey he
Hi, 夏帅

感谢反馈问题,我建了一个issue https://issues.apache.org/jira/browse/FLINK-18055
,应该今天就可以fix

Best,
Godfrey

Leonard Xu  于2020年6月2日周二 下午12:13写道:

> Hi, 夏帅
>
> 感谢反馈,这应该是个bug,我 这边本地也复现了,我先看下哈
>
> 祝好,
> Leonard Xu
>
> > 在 2020年6月2日,11:57,夏帅  写道:
> >
> > 是我编译的问题么,在window下编译的
>
>


Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-06-01 文章 godfrey he
目前StreamTableEnvironment和TableEnvironment在DAG优化方面的行为自1.11开始都是一样的了,建议都使用StatementSet来支持多insert。TableEnvironment以后还会进一步的简化和整合。

Best,
Godfrey

wind.fly@outlook.com  于2020年5月28日周四 下午5:45写道:

> Hi,
>
> StreamTableEnvironment类似于TableEnvironment的Dag优化有版本规划吗?未来TableEnvironment的dag优化和StreamTableEnvironment较丰富api会不会统一?
>
> Best,
> Junbao Zhang
> 
> 发件人: Benchao Li 
> 发送时间: 2020年5月28日 17:35
> 收件人: user-zh 
> 主题: Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
>
> Hi,
>
> 时间差了13个小时这个比较奇怪,我理解不应该出现这个问题的。你是说不用TableEnvironment就不会出现这个问题么?
>
> 第二个问题,TableEnvironment目前应该是没法设置checkpoint的,这个应该是只能在StreamExecutionEnvironment来配置吧。
>
> wind.fly@outlook.com  于2020年5月28日周四
> 下午5:27写道:
>
> > Hi, Benchao:
> >
> >
> DAG图是指向了同一个问题,但是用了TableEnvironment后,发现所有的时间都差了13个小时,比如调用LOCALTIMESTAMP函数。另外问一个问题,使用TableEnvironment怎么设置checkpoint?
> >
> >
> >
> >
> > Best,
> > Junbao Zhang
> > 
> > 发件人: Benchao Li 
> > 发送时间: 2020年5月28日 17:05
> > 收件人: user-zh 
> > 主题: Re: 疑问:flink sql
> 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> >
> > 嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink
> > UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。
> >
> > wind.fly@outlook.com  于2020年5月28日周四
> > 下午5:02写道:
> >
> > > Hi, Benchao:
> > > 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?
> > >
> > >
> > >
> > >
> > > Best,
> > > Junbao Zhang
> > > 
> > > 发件人: Benchao Li 
> > > 发送时间: 2020年5月28日 15:59
> > > 收件人: user-zh 
> > > 主题: Re: 疑问:flink sql
> > 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> > >
> > >
> > >
> >
> 如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。
> > >
> > > wind.fly@outlook.com  于2020年5月28日周四
> > > 下午3:14写道:
> > >
> > > > Hi,all:
> > > > 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
> > > >
> > > > tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> > > > tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
> > > > tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
> > > >
> > > > 其中a是kafka表,connector属性为:
> > > > 'connector.properties.group.id' = 'testGroup',
> > > > 'connector.startup-mode' = 'group-offsets'
> > > >
> > > >疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> > > > offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: SQL_CLI构建流式应用参数设置

2020-04-17 文章 godfrey he
第一个问题:set execution.parallelism=10;
这样命令设置的job的默认并发度。一些算子有自己并发度的设置逻辑,不受默认并发度的影响(例如
hive的source,是根据partition数来的)。你可以在提交作业之前调用set命令来修改每个job的默认并发度。
第二个问题:1.11在 FLINK-16822[1] 被fix后,你可以通过配置项方式设置checkpoint [2]。例如:
set execution.checkpointing.mode=EXACTLY_ONCE。

[1] https://issues.apache.org/jira/browse/FLINK-16822
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#checkpointing

Best,
Godfrey

Even <452232...@qq.com> 于2020年4月17日周五 下午3:44写道:

> Hi!Nabble似乎无法注册,所以在邮件列表中无法回复。
> 关于周三问题,还有一点疑惑:
> 1、在CLI通过 set 命令,如 set
>  execution.parallelism=10 动态设置并行度,是当前CLI创建的所有任务都是这个并行度吗?
> 2、 SQL CLI 还不支持 checkpoint 的设置,这个以后会考虑支持吗?如果已考虑会在下个版本中发布吗?
> 非常感谢!
>
>
> 
> Hi Even, 
>
> 1. 目前 SQL CLI 支持通过在 sql-client-default.yaml 里设置 parallelism 
> 和 max-parallelism 来控制 CLI 任务的默认并发。或者通过 set 命令,如 set 
> execution.parallelism=10;放方式动态设置。例外,对于如果使用 blink 
> planner,可以用 table.exec.resource.default-parallelism 来配置默认并发。[1] 
> 另外 SQL CLI 还不支持 checkpoint 的设置。 
> 2. 目前 SQL CLI 默认是 in-memory catalog,在每个SQL CLI的独立进程中,不会共享。如果SQL 
> CLI挂掉,in-memory catalog 也会消失。你可以配置你的catalog为 hive catalog [1],
> 这样你创建的表会持久化到 
> hive catalog 中,多个SQL CLI使用同一个hive catalog,可以达到你说期望的共享。 
>
> [1] 
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#environment-files
>
> Best, 
> Godfrey 
>
> Even <[hidden email]> 于2020年4月15日周三 下午3:35写道: 
>
> > Hi! 
> > 请教两个问题: 
> > 1、 Flink SQL CLI 纯文本方式构建一个流式应用,在DDL语句中如何设置checkpoint和并行度这些参数? 
> > 2、 Flink SQL CLI 
> >
> 纯文本方式构建的流式应用创建的那些表,我在另外一个CLI中是无法找到这些table的,这是为什么?如果任务挂掉了,应该怎么重启,还是必须重新再构建? 


Re: 请问Flink-1.10.1 release可以在哪里下载?(无正文)

2020-04-16 文章 godfrey he
目前社区已经在讨论 release-1.10.1 RC [1] 的发布

[1]
http://mail-archives.apache.org/mod_mbox/flink-dev/202004.mbox/%3CCAM7-19K0YsejvZpfVJrvEX6_DOJ7sUViEn9nB-5zfhX8P28_9A%40mail.gmail.com%3E

Best,
Godfrey

Benchao Li  于2020年4月16日周四 下午3:06写道:

> Hi,
> Flikn 1.10.1还没有正式发布,暂时还没有地方可以直接下载。可以从源码直接编译一下~
>
> samuel@ubtrobot.com  于2020年4月16日周四 下午3:04写道:
>
> >
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: FlinkSQL构建流式应用checkpoint设置

2020-04-15 文章 godfrey he
Hi Even,

1. 目前 SQL CLI 支持通过在 sql-client-default.yaml 里设置 parallelism
和 max-parallelism 来控制 CLI 任务的默认并发。或者通过 set 命令,如 set
execution.parallelism=10;放方式动态设置。例外,对于如果使用 blink
planner,可以用 table.exec.resource.default-parallelism 来配置默认并发。[1]
另外 SQL CLI 还不支持 checkpoint 的设置。
2. 目前 SQL CLI 默认是 in-memory catalog,在每个SQL CLI的独立进程中,不会共享。如果SQL
CLI挂掉,in-memory catalog 也会消失。你可以配置你的catalog为 hive catalog [1], 这样你创建的表会持久化到
hive catalog 中,多个SQL CLI使用同一个hive catalog,可以达到你说期望的共享。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#environment-files

Best,
Godfrey

Even <452232...@qq.com> 于2020年4月15日周三 下午3:35写道:

> Hi!
> 请教两个问题:
> 1、 Flink SQL CLI 纯文本方式构建一个流式应用,在DDL语句中如何设置checkpoint和并行度这些参数?
> 2、 Flink SQL CLI
> 纯文本方式构建的流式应用创建的那些表,我在另外一个CLI中是无法找到这些table的,这是为什么?如果任务挂掉了,应该怎么重启,还是必须重新再构建?


Re: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

2020-04-02 文章 godfrey he
Hi Xinghalo,

欢迎向 sql gateway 贡献~

Best,
Godfrey

111  于2020年4月2日周四 上午11:10写道:

> Hi,
> 了解了,那我知道怎么解决了。我这边使用的是sql-gateway,看样子得在sql-gateway里面加一种表定义的语法了。
> 多谢多谢
>
>
> Best,
> Xinghalo
> 在2020年04月2日 10:52,Benchao Li 写道:
> 你写的不是维表join的语法,维表join现在用的是temporal table[1] 的方式来实现的,需要特殊的join语法:
>
> SELECT
> o.amout, o.currency, r.rate, o.amount * r.rateFROM
> Orders AS o
> JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
> ON r.currency = o.currency
>
> 此外,你看的JDBCTableSource是一个普通的bounded source,或者是batch
> source。真正的维表的代码是在JDBCLookupFunction里面的。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
>
> 111  于2020年4月2日周四 上午10:33写道:
>
> Hi,
> 试验了下貌似不行,我的sql:
>
>
> select s.*, item.product_name
> from  ( select member_id, uuid, page, cast(SPLIT_INDEX(page,
> '10.pd.item-', 1) as string) as item_id,  `time` from tgs_topic_t1  where
> page like '10.pd.item-%’ ) s
> inner join PROD_ITEM_MALL_ACTIVITY_PRODUCT item on cast(item.id as
> string) = s.item_id
> where s.item_id is not null
>
>
> 看了下代码JDBCTableSource中的实现
> String query = dialect.getSelectFromStatement( options.getTableName(),
> returnType.getFieldNames(), new String[0]);
> 构建sql的时候  conditionFields 是写死的空数组,因此肯定不会有查询条件参与进来。
> 后面真正open()连接查询的时候,也只处理了分区字段,并没有查询条件字段。
> 在2020年04月2日 10:11,Benchao Li 写道:
> Hi,
>
> 能否把你的SQL也发出来呢?
>
>
> 正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。
>
> 111  于2020年4月2日周四 上午9:55写道:
>
> Hi,
> 想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
> 1 获取查询sql中的字段和表名,拼接成select a, b, c from t
> 2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
> 3 执行sql,加载到内存(不确定后面缓存的实现细节)
>
>
>
>
>
> 目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
> 结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
> 可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?
>
>
> 之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
> 不知道有没有什么优雅的解决方案?
>
>
> Best,
> Xinghalo
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 文章 godfrey he
目前 flink 支持 Scala 2.11 和 Scala 2.12, 默认情况下,通过 mvn package 打包出来的是包含 Scala
2.11 的包,例如  flink-table-blink_*2.11*-1.10.0.jar。
可以通过  -Dscala-2.12 指定 Scala 的版本是 2.12, 打出来的包是 flink-table-blink_*2.12*
-1.10.0.jar  这样的。

Best,
Godfrey

wangl...@geekplus.com.cn  于2020年3月26日周四 下午6:34写道:

>
> flink-table-uber-blink 下
>  mvn clean install -DskipTests -Dscala-2.12 -DskipTests
>
> 不清楚这个 -Dscala-2.12 怎样起作用的,但这样是可以直接替换掉服务器上的 jar 并正常 work的
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
> Sender: Kurt Young
> Send Time: 2020-03-26 18:15
> Receiver: user-zh
> cc: jihongchao
> Subject: Re: flink 安装包的几个 jar 是怎么 build 出来的
> flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)
>
> Best,
> Kurt
>
>
> On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
> >
> > 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> > 这个 jar 是从哪里 build 出来的呢?
> >
> > 我 clone github 上的源代码,mvn clean package
> > 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> > flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
> >  flink-table-blink_2.12-1.10.0.jar  是对应的
> > 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
> >
> > 谢谢,
> > 王磊
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
> >
>


Re: 回复: Flink JDBC Driver是否支持创建流数据表

2020-03-26 文章 godfrey he
还有一种方式是sql gateway 支持 --jar 和 --library 指定用户的jar,这种方式不需要用户将jar放到flink的lib下

godfrey he  于2020年3月25日周三 下午6:24写道:

> hi 赵峰,
>
> 你出现的这个问题,是在classpath中找不到Kafka相关TableFactory,按照zhenghua说的方式可以解决。但是现在Flink
> JDBC Driver只支持Batch模式,而Kafka table source目前只支持stream模式。
>
> Best,
> Godfrey
>
> Zhenghua Gao  于2020年3月25日周三 下午4:26写道:
>
>> 请确认一下 kafka connector 的jar包是否在 flink/lib 下。
>> 目前的报错看起来是找不到kafka connector的jar包。
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Wed, Mar 25, 2020 at 4:18 PM 赵峰  wrote:
>>
>> > 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中
>> >
>> >
>> > 
>> >
>> > 参考下这个文档:
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>> > 下面的语法应该是不支持的:
>> >   'format.type' = 'csv',\n" +
>> > "'format.field-delimiter' = '|'\n"
>> >
>> > 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
>> > tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
>> > + "order_no VARCHAR,\n"
>> > + "status INT\n"
>> > + ") WITH (\n"
>> > + "'connector.type' = 'kafka',\n"
>> > + "'connector.version' = 'universal',\n"
>> > + "'connector.topic' = 'wanglei_test',\n"
>> > + "'connector.startup-mode' = 'latest-offset',\n"
>> > + "'connector.properties.0.key' = 'zookeeper.connect',\n"
>> > + "'connector.properties.0.value' = 'xxx:2181',\n"
>> > + "'connector.properties.1.key' = 'bootstrap.servers',\n"
>> > + "'connector.properties.1.value' = 'xxx:9092',\n"
>> > + "'update-mode' = 'append',\n"
>> > + "'format.type' = 'json',\n"
>> > + "'format.derive-schema' = 'true'\n"
>> > + ")");
>> >
>> > 王磊
>> >
>> >
>> > wangl...@geekplus.com.cn
>> > 发件人: 赵峰
>> > 发送时间: 2020-03-24 21:28
>> > 收件人: user-zh
>> > 主题: Flink JDBC Driver是否支持创建流数据表
>> > hi
>> >
>> > Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
>> > Connection connection =
>> >
>> DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
>> > Statement statement = connection.createStatement();
>> > statement.executeUpdate(
>> > "CREATE TABLE table_kafka (\n" +
>> > "user_id BIGINT,\n" +
>> > "item_id BIGINT,\n" +
>> > "category_id BIGINT,\n" +
>> > "behavior STRING,\n" +
>> > "ts TIMESTAMP(3),\n" +
>> > "proctime as PROCTIME(),\n" +
>> > "WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
>> > ") WITH (\n" +
>> > "'connector.type' = 'kafka', \n" +
>> > "'connector.version' = 'universal', \n" +
>> > "'connector.topic' = 'flink_im02', \n" +
>> > "'connector.properties.group.id' = 'flink_im02_new',\n" +
>> > "'connector.startup-mode' = 'earliest-offset', \n" +
>> > "'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
>> > "'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
>> > "'format.type' = 'csv',\n" +
>> > "'format.field-delimiter' = '|'\n" +
>> > ")");
>> > ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
>> > while (rs1.next()) {
>> > System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
>> > }
>> > statement.close();
>> > connection.close();
>> > 报错:
>> > Reason: Required context properties mismatch.
>> > The matching candidates:
>> > org.apache.flink.table.sources.CsvBatchTableSourceFactory
>> > Mismatched properties:
>> > 'connector.type' expects 'filesystem', but is 'kafka'
>> > 赵峰
>> >
>> > 
>> > Quoted from:
>> >
>> http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html
>> >
>> >
>> >
>> >
>> > 赵峰
>>
>


  1   2   >