Re: flink sql 写 hive分区表失败

2020-05-27 文章 Leonard Xu
 
>|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year, 
> `p_month` = p_month)
>|select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and 
> `p_month` = 4 

动态分区不是这样指定的,和hive的语法是一样的,下面两种应该都可以,flink这边文档少了点,可以参考[1][2]

INSERT INTO dwdCatalog.dwd.t1_copy 
 select id,name,`p_year`,`p_month` from dwdCatalog.dwd.t1 where `p_year` = 
2020 and `p_month` = 4 

INSERT INTO dwdCatalog.dwd.t1_copy 
select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` = 4 

Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/insert.html#examples
 

[2]  
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java#L294
 




> 在 2020年5月28日,13:59,Zhou Zach  写道:
> 
> 多谢指点,可以了。
> 但是换成动态插入,有问题:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: SQL parse failed. Encountered "p_year" at line 3, column 58.
> Was expecting one of:
>"DATE" ...
>"FALSE" ...
>"INTERVAL" ...
>"NULL" ...
>"TIME" ...
>"TIMESTAMP" ...
>"TRUE" ...
>"UNKNOWN" ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
>"+" ...
>"-" ...
> 
> 
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> 
> 
> 
> 
> Query:
> tableEnv.sqlUpdate(
>  """
>|
>|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year, 
> `p_month` = p_month)
>|select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and 
> `p_month` = 4
>|
>|""".stripMargin)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-05-28 13:39:49,"Leonard Xu"  写道:
>> Hi,
>>>   |select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` 
>>> = 5
>> 
>> 应该是 select * 会把分区字段一起带出来吧,你字段就不匹配了,select里加上你需要的字段吧
>> 
>> 祝好,
>> Leonard Xu
>> 
>>> 在 2020年5月28日,12:57,Zhou Zach  写道:
>>> 
>>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>>> caused an error: Field types of query result and registered TableSink 
>>> dwdCatalog.dwd.t1_copy do not match.
>>> 
>>> Query schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT, EXPR$4: 
>>> INT NOT NULL, EXPR$5: INT NOT NULL]
>>> 
>>> Sink schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT]
>>> 
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>> 
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>> 
>>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>> 
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>> 
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>> 
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>> 
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>> 
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> 
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>> 
>>> at 
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>>> 
>>> at 
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>> 
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>> 
>>> 
>>> 
>>> 
>>> hive分区表:
>>> CREATE TABLE `dwd.t1`(
>>> `id` bigint, 
>>> `name` string)
>>> PARTITIONED BY ( 
>>> `p_year` int, 
>>> `p_month` int)
>>> 
>>> 
>>> CREATE TABLE `dwd.t1_copy`(
>>> `id` bigint, 
>>> `name` string)
>>> PARTITIONED BY ( 
>>> `p_year` int, 
>>> `p_month` 

Re:Re: flink sql 写 hive分区表失败

2020-05-27 文章 Zhou Zach
多谢指点,可以了。
但是换成动态插入,有问题:
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: SQL parse failed. Encountered "p_year" at line 3, column 58.
Was expecting one of:
"DATE" ...
"FALSE" ...
"INTERVAL" ...
"NULL" ...
"TIME" ...
"TIMESTAMP" ...
"TRUE" ...
"UNKNOWN" ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
"+" ...
"-" ...


at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)




Query:
tableEnv.sqlUpdate(
  """
|
|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year, 
`p_month` = p_month)
|select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and 
`p_month` = 4
|
|""".stripMargin)

















在 2020-05-28 13:39:49,"Leonard Xu"  写道:
>Hi,
>>|select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` 
>> = 5
>
>应该是 select * 会把分区字段一起带出来吧,你字段就不匹配了,select里加上你需要的字段吧
> 
>祝好,
>Leonard Xu
>
>> 在 2020年5月28日,12:57,Zhou Zach  写道:
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>> caused an error: Field types of query result and registered TableSink 
>> dwdCatalog.dwd.t1_copy do not match.
>> 
>> Query schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT, EXPR$4: 
>> INT NOT NULL, EXPR$5: INT NOT NULL]
>> 
>> Sink schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT]
>> 
>> at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> 
>> at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> 
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> 
>> at 
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> 
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> 
>> at 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> 
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> 
>> at java.security.AccessController.doPrivileged(Native Method)
>> 
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> 
>> at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> 
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> 
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> 
>> 
>> 
>> 
>> hive分区表:
>> CREATE TABLE `dwd.t1`(
>>  `id` bigint, 
>>  `name` string)
>> PARTITIONED BY ( 
>>  `p_year` int, 
>>  `p_month` int)
>> 
>> 
>> CREATE TABLE `dwd.t1_copy`(
>>  `id` bigint, 
>>  `name` string)
>> PARTITIONED BY ( 
>>  `p_year` int, 
>>  `p_month` int)
>> 
>> 
>> Flink sql:
>> tableEnv.sqlUpdate(
>>  """
>>|
>>|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = 2020, 
>> `p_month` = 5)
>>|select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` 
>> = 5
>>|
>>|""".stripMargin)
>> 
>> 
>> thanks for your help


Re: flink sql 写 hive分区表失败

2020-05-27 文章 Leonard Xu
Hi,
>|select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` = 
> 5

应该是 select * 会把分区字段一起带出来吧,你字段就不匹配了,select里加上你需要的字段吧
 
祝好,
Leonard Xu

> 在 2020年5月28日,12:57,Zhou Zach  写道:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Field types of query result and registered TableSink 
> dwdCatalog.dwd.t1_copy do not match.
> 
> Query schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT, EXPR$4: 
> INT NOT NULL, EXPR$5: INT NOT NULL]
> 
> Sink schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT]
> 
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> 
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> 
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> 
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> 
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> 
> at java.security.AccessController.doPrivileged(Native Method)
> 
> at javax.security.auth.Subject.doAs(Subject.java:422)
> 
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> 
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> 
> 
> 
> 
> hive分区表:
> CREATE TABLE `dwd.t1`(
>  `id` bigint, 
>  `name` string)
> PARTITIONED BY ( 
>  `p_year` int, 
>  `p_month` int)
> 
> 
> CREATE TABLE `dwd.t1_copy`(
>  `id` bigint, 
>  `name` string)
> PARTITIONED BY ( 
>  `p_year` int, 
>  `p_month` int)
> 
> 
> Flink sql:
> tableEnv.sqlUpdate(
>  """
>|
>|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = 2020, 
> `p_month` = 5)
>|select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` = 
> 5
>|
>|""".stripMargin)
> 
> 
> thanks for your help



flink sql 写 hive分区表失败

2020-05-27 文章 Zhou Zach
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Field types of query result and registered TableSink 
dwdCatalog.dwd.t1_copy do not match.

Query schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT, EXPR$4: INT 
NOT NULL, EXPR$5: INT NOT NULL]

Sink schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT]

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)

at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)




hive分区表:
CREATE TABLE `dwd.t1`(
  `id` bigint, 
  `name` string)
PARTITIONED BY ( 
  `p_year` int, 
  `p_month` int)
  
  
CREATE TABLE `dwd.t1_copy`(
  `id` bigint, 
  `name` string)
PARTITIONED BY ( 
  `p_year` int, 
  `p_month` int)


Flink sql:
tableEnv.sqlUpdate(
  """
|
|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = 2020, 
`p_month` = 5)
|select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` = 5
|
|""".stripMargin)


thanks for your help

回复: flink 如何自定义connector

2020-05-27 文章 111
Hi,
在sql-gateway里面,类加载默认是子类优先,每次提交任务都是独立的tableEnvironment,独立的classloader,因此不存在这个问题。
Best,
Xinghalo

回复: flink 如何自定义connector

2020-05-27 文章 Lijie Wang
可能的确会存在这个问题。在添加依赖时注意不要将 flink provided 的包打包进去就可以。也可以通过设置 parent-first 来解决这个问题。




在2020年05月28日 11:03,forideal 写道:
Hi 111,

关于第二点:
`2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下`
这么做是不是存在如下 `X can't be cast to X ` 隐患
因为把 Connector 放在 lib 中,会有 classloader 的问题,直接的现象就是 X can't be cast to X 
问题[1]。当然这只是说可能会发生。比如,我们把 usercode 代码放入 Flink lib 我们会发现,当我们使用 jar 
包上传的方式运行任务时,jar 中也包含 lib 中的代码会触发这样的问题。
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html#x-cannot-be-cast-to-x-exceptions


Best
forideal








在 2020-05-28 10:16:45,"111"  写道:
Hi,
想要在sqlgateway里面使用,那么可以看看下面几个条件:
1 满足SPI的要求,能让flink自动发现实现类
2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下
3 如果与Hive集成,使用hivecatalog,那么先要注册表
这样就可以使用了。
Best,
Xinghalo


Re:回复: flink 如何自定义connector

2020-05-27 文章 forideal
Hi 111,
  
 关于第二点:
   `2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下`
 这么做是不是存在如下 `X can't be cast to X ` 隐患
因为把 Connector 放在 lib 中,会有 classloader 的问题,直接的现象就是 X can't be cast to X 
问题[1]。当然这只是说可能会发生。比如,我们把 usercode 代码放入 Flink lib 我们会发现,当我们使用 jar 
包上传的方式运行任务时,jar 中也包含 lib 中的代码会触发这样的问题。
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html#x-cannot-be-cast-to-x-exceptions


Best
 forideal








在 2020-05-28 10:16:45,"111"  写道:
>Hi,
>想要在sqlgateway里面使用,那么可以看看下面几个条件:
>1 满足SPI的要求,能让flink自动发现实现类
>2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下
>3 如果与Hive集成,使用hivecatalog,那么先要注册表
>这样就可以使用了。
>Best,
>Xinghalo


Re: 向flink push代码

2020-05-27 文章 tison
Flink 的特点就是快(x)

Best,
tison.


宇张  于2020年5月28日周四 上午10:56写道:

> 感谢大佬们,我看到  Leonard Xu大佬已经关注了FLINK-17991
> 这个,好快的响应速度
>
> On Thu, May 28, 2020 at 10:25 AM Leonard Xu  wrote:
>
> > Hi,
> > Yangze 贴了官方教程,也可以看下 Jark 的博客[1],中文的看起来会快一些。
> >
> > Best,
> > Leonard Xu
> > [1] https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/ <
> > https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/>
> >
> > > 在 2020年5月28日,10:18,Yangze Guo  写道:
> > >
> > > 您好,社区的贡献代码教程[1]。
> > >
> > > Tips: 如果不是hotfix,请提交前确认已经在ticket下达成一致并且有committer将其assign给您。
> > >
> > > [1] https://flink.apache.org/zh/contributing/contribute-code.html
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Thu, May 28, 2020 at 10:15 AM 宇张  wrote:
> > >>
> > >> 找打了教程了
> > >>
> > >>
> > >> On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:
> > >>
> > >>> hi,
> > >>> 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
> > >>>
> >
> >
>


Re: 向flink push代码

2020-05-27 文章 宇张
感谢大佬们,我看到  Leonard Xu大佬已经关注了FLINK-17991
这个,好快的响应速度

On Thu, May 28, 2020 at 10:25 AM Leonard Xu  wrote:

> Hi,
> Yangze 贴了官方教程,也可以看下 Jark 的博客[1],中文的看起来会快一些。
>
> Best,
> Leonard Xu
> [1] https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/ <
> https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/>
>
> > 在 2020年5月28日,10:18,Yangze Guo  写道:
> >
> > 您好,社区的贡献代码教程[1]。
> >
> > Tips: 如果不是hotfix,请提交前确认已经在ticket下达成一致并且有committer将其assign给您。
> >
> > [1] https://flink.apache.org/zh/contributing/contribute-code.html
> >
> > Best,
> > Yangze Guo
> >
> > On Thu, May 28, 2020 at 10:15 AM 宇张  wrote:
> >>
> >> 找打了教程了
> >>
> >>
> >> On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:
> >>
> >>> hi,
> >>> 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
> >>>
>
>


Re: Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 文章 macia kk
Get it, thanks

Leonard Xu  于2020年5月28日周四 上午10:34写道:

>
> > 我原以为 Table API 会比  SQL API 自由度大一些,毕竟可以代码层面定制
>
>
> Table API 确实是要灵活些,只是最近两个版本中,SQL模块社区的主要精力在搞DDL,DDL可以降低用户门槛,提升易用性,Table
> API的迭代稍微慢了些,
> 我理解1.12中应该会完善Descriptor API,这也是比较重要的用户入口,目前建议优先使用DDL。
>
> Best,
> Leonard Xu
>
> > 在 2020年5月28日,10:23,macia kk  写道:
> >
> > 好的,谢谢,
> >
> > 放弃治疗,我先尝试DDL,先把 job 跑通,我原以为 Table API 会比  SQL API 自由度大一些,毕竟可以代码层面定制
> >
> > Leonard Xu  于2020年5月28日周四 上午10:17写道:
> >
> >> Hi,
> >> 我看了下Descriptor的代码,如果数据源是Kafka应该有地方绕,很绕, 你可以试下,建议使用DDL。
> >>
> >> Best
> >> Leonard Xu
> >>
> >> [1]
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
> >> <
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
> >>>
> >> [2]
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
> >> <
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
> >>>
> >>
> >>
> >>
> >>> 在 2020年5月28日,00:45,macia kk  写道:
> >>>
> >>> Hi 各位大佬
> >>>
> >>>  .field("event_time", TIMESTAMP()).rowtime(
> >>> new Rowtime()
> >>> .timestampsFromField("maxwell_ts")
> >>> .watermarksPeriodicBounded(6)
> >>>   )
> >>>
> >>>
> >>> 我这个 maxwell_ts 是 milliseconds ,直接这么用会报错:
> >>>
> >>> Type TIMESTAMP(6) of table field ‘event_time’ does not match with the
> >>> physical type
> >>>
> >>>
> >>> 有类似
> >>>
> >>> event_time as to_timestamp(maxwell_ts)
> >>>
> >>>
> >>> 这么的操作码?
> >>
> >>
>
>


Re: Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 文章 Leonard Xu


> 我原以为 Table API 会比  SQL API 自由度大一些,毕竟可以代码层面定制


Table API 确实是要灵活些,只是最近两个版本中,SQL模块社区的主要精力在搞DDL,DDL可以降低用户门槛,提升易用性,Table 
API的迭代稍微慢了些,
我理解1.12中应该会完善Descriptor API,这也是比较重要的用户入口,目前建议优先使用DDL。

Best,
Leonard Xu

> 在 2020年5月28日,10:23,macia kk  写道:
> 
> 好的,谢谢,
> 
> 放弃治疗,我先尝试DDL,先把 job 跑通,我原以为 Table API 会比  SQL API 自由度大一些,毕竟可以代码层面定制
> 
> Leonard Xu  于2020年5月28日周四 上午10:17写道:
> 
>> Hi,
>> 我看了下Descriptor的代码,如果数据源是Kafka应该有地方绕,很绕, 你可以试下,建议使用DDL。
>> 
>> Best
>> Leonard Xu
>> 
>> [1]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
>> <
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
>>> 
>> [2]
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
>> <
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
>>> 
>> 
>> 
>> 
>>> 在 2020年5月28日,00:45,macia kk  写道:
>>> 
>>> Hi 各位大佬
>>> 
>>>  .field("event_time", TIMESTAMP()).rowtime(
>>> new Rowtime()
>>> .timestampsFromField("maxwell_ts")
>>> .watermarksPeriodicBounded(6)
>>>   )
>>> 
>>> 
>>> 我这个 maxwell_ts 是 milliseconds ,直接这么用会报错:
>>> 
>>> Type TIMESTAMP(6) of table field ‘event_time’ does not match with the
>>> physical type
>>> 
>>> 
>>> 有类似
>>> 
>>> event_time as to_timestamp(maxwell_ts)
>>> 
>>> 
>>> 这么的操作码?
>> 
>> 



Re: 向flink push代码

2020-05-27 文章 Leonard Xu
Hi,
Yangze 贴了官方教程,也可以看下 Jark 的博客[1],中文的看起来会快一些。

Best,
Leonard Xu
[1] https://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/ 


> 在 2020年5月28日,10:18,Yangze Guo  写道:
> 
> 您好,社区的贡献代码教程[1]。
> 
> Tips: 如果不是hotfix,请提交前确认已经在ticket下达成一致并且有committer将其assign给您。
> 
> [1] https://flink.apache.org/zh/contributing/contribute-code.html
> 
> Best,
> Yangze Guo
> 
> On Thu, May 28, 2020 at 10:15 AM 宇张  wrote:
>> 
>> 找打了教程了
>> 
>> 
>> On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:
>> 
>>> hi,
>>> 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
>>> 



Re: Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 文章 macia kk
好的,谢谢,

放弃治疗,我先尝试DDL,先把 job 跑通,我原以为 Table API 会比  SQL API 自由度大一些,毕竟可以代码层面定制

Leonard Xu  于2020年5月28日周四 上午10:17写道:

> Hi,
> 我看了下Descriptor的代码,如果数据源是Kafka应该有地方绕,很绕, 你可以试下,建议使用DDL。
>
> Best
> Leonard Xu
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
> <
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
> >
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
> <
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
> >
>
>
>
> > 在 2020年5月28日,00:45,macia kk  写道:
> >
> > Hi 各位大佬
> >
> >   .field("event_time", TIMESTAMP()).rowtime(
> >  new Rowtime()
> >  .timestampsFromField("maxwell_ts")
> >  .watermarksPeriodicBounded(6)
> >)
> >
> >
> > 我这个 maxwell_ts 是 milliseconds ,直接这么用会报错:
> >
> > Type TIMESTAMP(6) of table field ‘event_time’ does not match with the
> > physical type
> >
> >
> > 有类似
> >
> > event_time as to_timestamp(maxwell_ts)
> >
> >
> > 这么的操作码?
>
>


Re: 向flink push代码

2020-05-27 文章 Yangze Guo
您好,社区的贡献代码教程[1]。

Tips: 如果不是hotfix,请提交前确认已经在ticket下达成一致并且有committer将其assign给您。

[1] https://flink.apache.org/zh/contributing/contribute-code.html

Best,
Yangze Guo

On Thu, May 28, 2020 at 10:15 AM 宇张  wrote:
>
> 找打了教程了
>
>
> On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:
>
> > hi,
> > 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
> >


Re: Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 文章 Leonard Xu
Hi,
我看了下Descriptor的代码,如果数据源是Kafka应该有地方绕,很绕, 你可以试下,建议使用DDL。

Best
Leonard Xu

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
 

[2] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
 




> 在 2020年5月28日,00:45,macia kk  写道:
> 
> Hi 各位大佬
> 
>   .field("event_time", TIMESTAMP()).rowtime(
>  new Rowtime()
>  .timestampsFromField("maxwell_ts")
>  .watermarksPeriodicBounded(6)
>)
> 
> 
> 我这个 maxwell_ts 是 milliseconds ,直接这么用会报错:
> 
> Type TIMESTAMP(6) of table field ‘event_time’ does not match with the
> physical type
> 
> 
> 有类似
> 
> event_time as to_timestamp(maxwell_ts)
> 
> 
> 这么的操作码?



回复: flink 如何自定义connector

2020-05-27 文章 111
Hi,
想要在sqlgateway里面使用,那么可以看看下面几个条件:
1 满足SPI的要求,能让flink自动发现实现类
2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下
3 如果与Hive集成,使用hivecatalog,那么先要注册表
这样就可以使用了。
Best,
Xinghalo

Re: 向flink push代码

2020-05-27 文章 宇张
找打了教程了


On Thu, May 28, 2020 at 10:10 AM 宇张  wrote:

> hi,
> 请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。
>


回复: flink 如何自定义connector

2020-05-27 文章 111
Hi,
随便开一个jdbc connector之类的,模仿造一个就行:
1 需要有Service loader的描述符文件:resources下面需要有META-INFO.services,里面有TableFactory的实现类声明
2 创建对应的TableFactory实现类,根据source sink实现不同的接口,返回对应的TableSource或者TableSink
3 如果是TableSource,根据剪枝、lookup等特性实现不同的接口
4 如果是TableSink,根据upsert、append、retract实现不同的接口
总之,开一个connector源码,抄一下就行。
Best,
Xinghalo

向flink push代码

2020-05-27 文章 宇张
hi,
请问给flink社区push代码的流程是怎么样的哪,有没有小白教程啊。。


Re: flink 如何自定义connector

2020-05-27 文章 Peihui He
hello


 现在已经定义了一个tablesource,可以通过  batchEnv.registerTableSource 这个注册
并查询数据,但是如何在sqlgateway 中配置呢?

Leonard Xu  于2020年5月28日周四 上午9:32写道:

> Hi,
> 可以参考现有的connector,如hbase,jdbc,结合[1]实现自定义connector。
>
>
> 祝好,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
> >
>
>
> > 在 2020年5月28日,09:16,Peihui He  写道:
> >
> > hello
> >
> >请教大家,flink 1.10中如何自定义coonnecter,然后注册到flink sql
> > gateway,使得可以执行sql的操作呢?
> >
> >
> > best wish
>
>


Re: 回复:flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端

2020-05-27 文章 wangweigu...@stevegame.cn

确实,你只要配置好CDH的HADOOP_CONF环境变量,同时下载开源的Hadoop版本(和CDH版本相同)放到flink lib下,就可以访问CDH 
yarn,提交作业!

目前我这边是CDH 5.16.1,Flink 1.10,提交Flink on yarn是没问题,任务运行也没问题,还可以使用Flink on hive!

flink-shaded-hadoop-2-uber-2.6.5-10.0.jar



 
发件人: 111
发送时间: 2020-05-28 09:13
收件人: user-zh@flink.apache.org
主题: 回复:flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端
Hi,
一般只要你有yarn环境,在任意一台机器上下载flink安装包,配一下HADOOP_CONF环境变量就可以使用。
如果是session模式:可以使用Yarn-session.sh启动yarn session集群,然后通过flink run xxx 提交程序。
如果是per job模式:直接使用flink run即可。
best,
Xinghalo


Re: flink 如何自定义connector

2020-05-27 文章 Leonard Xu
Hi,
可以参考现有的connector,如hbase,jdbc,结合[1]实现自定义connector。


祝好,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
 



> 在 2020年5月28日,09:16,Peihui He  写道:
> 
> hello
> 
>请教大家,flink 1.10中如何自定义coonnecter,然后注册到flink sql
> gateway,使得可以执行sql的操作呢?
> 
> 
> best wish



flink 如何自定义connector

2020-05-27 文章 Peihui He
hello

请教大家,flink 1.10中如何自定义coonnecter,然后注册到flink sql
gateway,使得可以执行sql的操作呢?


best wish


回复:flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端

2020-05-27 文章 111
Hi,
一般只要你有yarn环境,在任意一台机器上下载flink安装包,配一下HADOOP_CONF环境变量就可以使用。
如果是session模式:可以使用Yarn-session.sh启动yarn session集群,然后通过flink run xxx 提交程序。
如果是per job模式:直接使用flink run即可。
best,
Xinghalo

flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端

2020-05-27 文章 王飞
flink 1.9  1.10 在cdh上怎么搭建一个客户端。  我需要一个客户端启动flink on yan.  1.7版本 是正常的。 但是1.9 
和1.10 启动了on yarn 任务。我的环境是cdh hadoop。 谢谢回答



Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 文章 macia kk
Hi 各位大佬

   .field("event_time", TIMESTAMP()).rowtime(
  new Rowtime()
  .timestampsFromField("maxwell_ts")
  .watermarksPeriodicBounded(6)
)


我这个 maxwell_ts 是 milliseconds ,直接这么用会报错:

Type TIMESTAMP(6) of table field ‘event_time’ does not match with the
physical type


有类似

event_time as to_timestamp(maxwell_ts)


这么的操作码?


Re: Flink sql 跨库

2020-05-27 文章 Leonard Xu

问题解决了就好,

> 找到原因了,flink 把year 当成关键字了

YEAR在时间相关类型中会使用,在 FLINK SQL 是关键字的,在一些DB里也是关键字,使用时需要转义,类似的还有DAY, MONTH等[1]。

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/types.html#interval-year-to-month
 


祝好
Leonard Xu

> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> At 2020-05-27 19:09:43, "Zhou Zach"  wrote:
>> The program finished with the following exception:
>> 
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>> caused an error: SQL parse failed. Encountered "year =" at line 4, column 51.
>> Was expecting one of:
>>   "ARRAY" ...
>>   "CASE" ...
>>   "CURRENT" ...
>>   "CURRENT_CATALOG" ...
>>   "CURRENT_DATE" ...
>>   "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>   "CURRENT_PATH" ...
>>   "CURRENT_ROLE" ...
>>   "CURRENT_SCHEMA" ...
>>   "CURRENT_TIME" ...
>>   "CURRENT_TIMESTAMP" ...
>>   "CURRENT_USER" ...
>>   "DATE" ...
>>   "EXISTS" ...
>>   "FALSE" ...
>>   "INTERVAL" ...
>>   "LOCALTIME" ...
>>   "LOCALTIMESTAMP" ...
>>   "MULTISET" ...
>>   "NEW" ...
>>   "NEXT" ...
>>   "NOT" ...
>>   "NULL" ...
>>   "PERIOD" ...
>>   "SESSION_USER" ...
>>   "SYSTEM_USER" ...
>>   "TIME" ...
>>   "TIMESTAMP" ...
>>   "TRUE" ...
>>   "UNKNOWN" ...
>>   "USER" ...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>   "?" ...
>>   "+" ...
>>   "-" ...
>>...
>>...
>>...
>>...
>>...
>>   "CAST" ...
>>   "EXTRACT" ...
>>   "POSITION" ...
>>   "CONVERT" ...
>>   "TRANSLATE" ...
>>   "OVERLAY" ...
>>   "FLOOR" ...
>>   "CEIL" ...
>>   "CEILING" ...
>>   "SUBSTRING" ...
>>   "TRIM" ...
>>   "CLASSIFIER" ...
>>   "MATCH_NUMBER" ...
>>   "RUNNING" ...
>>   "PREV" ...
>>   "JSON_EXISTS" ...
>>   "JSON_VALUE" ...
>>   "JSON_QUERY" ...
>>   "JSON_OBJECT" ...
>>   "JSON_OBJECTAGG" ...
>>   "JSON_ARRAY" ...
>>   "JSON_ARRAYAGG" ...
>>   "MAP" ...
>>   "SPECIFIC" ...
>>   "ABS" ...
>>   "AVG" ...
>>   "CARDINALITY" ...
>>   "CHAR_LENGTH" ...
>>   "CHARACTER_LENGTH" ...
>>   "COALESCE" ...
>>   "COLLECT" ...
>>   "COVAR_POP" ...
>>   "COVAR_SAMP" ...
>>   "CUME_DIST" ...
>>   "COUNT" ...
>>   "DENSE_RANK" ...
>>   "ELEMENT" ...
>>   "EXP" ...
>>   "FIRST_VALUE" ...
>>   "FUSION" ...
>>   "GROUPING" ...
>>   "HOUR" ...
>>   "LAG" ...
>>   "LEAD" ...
>>   "LEFT" ...
>>   "LAST_VALUE" ...
>>   "LN" ...
>>   "LOWER" ...
>>   "MAX" ...
>>   "MIN" ...
>>   "MINUTE" ...
>>   "MOD" ...
>>   "MONTH" ...
>>   "NTH_VALUE" ...
>>   "NTILE" ...
>>   "NULLIF" ...
>>   "OCTET_LENGTH" ...
>>   "PERCENT_RANK" ...
>>   "POWER" ...
>>   "RANK" ...
>>   "REGR_COUNT" ...
>>   "REGR_SXX" ...
>>   "REGR_SYY" ...
>>   "RIGHT" ...
>>   "ROW_NUMBER" ...
>>   "SECOND" ...
>>   "SQRT" ...
>>   "STDDEV_POP" ...
>>   "STDDEV_SAMP" ...
>>   "SUM" ...
>>   "UPPER" ...
>>   "TRUNCATE" ...
>>   "VAR_POP" ...
>>   "VAR_SAMP" ...
>>   "YEAR" ...
>>   "YEAR" "(" ...
>> 
>> 
>> at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at 
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
>> Encountered "year =" at line 4, column 51.
>> Was expecting one of:
>>   "ARRAY" ...
>>   "CASE" ...
>>   "CURRENT" ...
>>   "CURRENT_CATALOG" ...
>>   "CURRENT_DATE" ...
>>   "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>>   "CURRENT_PATH" ...
>>   "CURRENT_ROLE" ...
>>   "CURRENT_SCHEMA" ...
>>   "CURRENT_TIME" ...
>>   "CURRENT_TIMESTAMP" ...
>>   "CURRENT_USER" ...
>>   "DATE" ...
>>   "EXISTS" ...
>>   "FALSE" ...
>>   "INTERVAL" ...
>>   "LOCALTIME" ...
>>   "LOCALTIMESTAMP" ...
>>   "MULTISET" ...
>>   "NEW" ...
>>   "NEXT" ...
>>   "NOT" ...
>>   "NULL" ...
>>   "PERIOD" ...
>>   "SESSION_USER" ...
>>   "SYSTEM_USER" ...
>>   "TIME" ...
>>   "TIMESTAMP" ...
>>   "TRUE" ...
>>   "UNKNOWN" ...
>>   "USER" ...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>...
>>   "?" ...
>>   "+" ...
>>   "-" ...
>>...
>>...
>>

Re:Re: Re: Re: Re: Flink sql 跨库

2020-05-27 文章 Zhou Zach
好的,感谢指点

















在 2020-05-27 19:33:42,"Rui Li"  写道:
>你是想要调试HiveCatalog的代码么?可以参考flink里的测试用例,我们有的测试是用embedded模式做的(比如HiveCatalogHiveMetadataTest),有些测试是单独起一个HMS进程(比如TableEnvHiveConnectorTest)。
>
>On Wed, May 27, 2020 at 7:27 PM Zhou Zach  wrote:
>
>> 是的,发现了,感谢指点。请教下,用intellij
>> idea调试,你是在本地调试吗,那样的话,要在本地搭建个hadoop集群吗,至少要搭建个本地的hive吧,还是直接用intellij
>> idea连接远程,如果集群在阿里云上,是不是要另外开端口的
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-05-27 19:19:58,"Rui Li"  写道:
>> >year在calcite里是保留关键字,你用`year`试试呢
>> >
>> >On Wed, May 27, 2020 at 7:09 PM Zhou Zach  wrote:
>> >
>> >> The program finished with the following exception:
>> >>
>> >>
>> >> org.apache.flink.client.program.ProgramInvocationException: The main
>> >> method caused an error: SQL parse failed. Encountered "year =" at line
>> 4,
>> >> column 51.
>> >> Was expecting one of:
>> >> "ARRAY" ...
>> >> "CASE" ...
>> >> "CURRENT" ...
>> >> "CURRENT_CATALOG" ...
>> >> "CURRENT_DATE" ...
>> >> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>> >> "CURRENT_PATH" ...
>> >> "CURRENT_ROLE" ...
>> >> "CURRENT_SCHEMA" ...
>> >> "CURRENT_TIME" ...
>> >> "CURRENT_TIMESTAMP" ...
>> >> "CURRENT_USER" ...
>> >> "DATE" ...
>> >> "EXISTS" ...
>> >> "FALSE" ...
>> >> "INTERVAL" ...
>> >> "LOCALTIME" ...
>> >> "LOCALTIMESTAMP" ...
>> >> "MULTISET" ...
>> >> "NEW" ...
>> >> "NEXT" ...
>> >> "NOT" ...
>> >> "NULL" ...
>> >> "PERIOD" ...
>> >> "SESSION_USER" ...
>> >> "SYSTEM_USER" ...
>> >> "TIME" ...
>> >> "TIMESTAMP" ...
>> >> "TRUE" ...
>> >> "UNKNOWN" ...
>> >> "USER" ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >> "?" ...
>> >> "+" ...
>> >> "-" ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >>  ...
>> >> "CAST" ...
>> >> "EXTRACT" ...
>> >> "POSITION" ...
>> >> "CONVERT" ...
>> >> "TRANSLATE" ...
>> >> "OVERLAY" ...
>> >> "FLOOR" ...
>> >> "CEIL" ...
>> >> "CEILING" ...
>> >> "SUBSTRING" ...
>> >> "TRIM" ...
>> >> "CLASSIFIER" ...
>> >> "MATCH_NUMBER" ...
>> >> "RUNNING" ...
>> >> "PREV" ...
>> >> "JSON_EXISTS" ...
>> >> "JSON_VALUE" ...
>> >> "JSON_QUERY" ...
>> >> "JSON_OBJECT" ...
>> >> "JSON_OBJECTAGG" ...
>> >> "JSON_ARRAY" ...
>> >> "JSON_ARRAYAGG" ...
>> >> "MAP" ...
>> >> "SPECIFIC" ...
>> >> "ABS" ...
>> >> "AVG" ...
>> >> "CARDINALITY" ...
>> >> "CHAR_LENGTH" ...
>> >> "CHARACTER_LENGTH" ...
>> >> "COALESCE" ...
>> >> "COLLECT" ...
>> >> "COVAR_POP" ...
>> >> "COVAR_SAMP" ...
>> >> "CUME_DIST" ...
>> >> "COUNT" ...
>> >> "DENSE_RANK" ...
>> >> "ELEMENT" ...
>> >> "EXP" ...
>> >> "FIRST_VALUE" ...
>> >> "FUSION" ...
>> >> "GROUPING" ...
>> >> "HOUR" ...
>> >> "LAG" ...
>> >> "LEAD" ...
>> >> "LEFT" ...
>> >> "LAST_VALUE" ...
>> >> "LN" ...
>> >> "LOWER" ...
>> >> "MAX" ...
>> >> "MIN" ...
>> >> "MINUTE" ...
>> >> "MOD" ...
>> >> "MONTH" ...
>> >> "NTH_VALUE" ...
>> >> "NTILE" ...
>> >> "NULLIF" ...
>> >> "OCTET_LENGTH" ...
>> >> "PERCENT_RANK" ...
>> >> "POWER" ...
>> >> "RANK" ...
>> >> "REGR_COUNT" ...
>> >> "REGR_SXX" ...
>> >> "REGR_SYY" ...
>> >> "RIGHT" ...
>> >> "ROW_NUMBER" ...
>> >> "SECOND" ...
>> >> "SQRT" ...
>> >> "STDDEV_POP" ...
>> >> "STDDEV_SAMP" ...
>> >> "SUM" ...
>> >> "UPPER" ...
>> >> "TRUNCATE" ...
>> >> "VAR_POP" ...
>> >> "VAR_SAMP" ...
>> >> "YEAR" ...
>> >> "YEAR" "(" ...
>> >>
>> >>
>> >> at
>> >>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> >> at
>> >>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> >> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> >> at
>> >>
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> >> at
>> >>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> >> at
>> >>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> >> at java.security.AccessController.doPrivileged(Native Method)
>> >> at javax.security.auth.Subject.doAs(Subject.java:422)
>> >> at
>> >>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> >> at
>> >>
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> >> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
>> >> failed. Encounte

Re: Re: Re: Re: Flink sql 跨库

2020-05-27 文章 Rui Li
你是想要调试HiveCatalog的代码么?可以参考flink里的测试用例,我们有的测试是用embedded模式做的(比如HiveCatalogHiveMetadataTest),有些测试是单独起一个HMS进程(比如TableEnvHiveConnectorTest)。

On Wed, May 27, 2020 at 7:27 PM Zhou Zach  wrote:

> 是的,发现了,感谢指点。请教下,用intellij
> idea调试,你是在本地调试吗,那样的话,要在本地搭建个hadoop集群吗,至少要搭建个本地的hive吧,还是直接用intellij
> idea连接远程,如果集群在阿里云上,是不是要另外开端口的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-05-27 19:19:58,"Rui Li"  写道:
> >year在calcite里是保留关键字,你用`year`试试呢
> >
> >On Wed, May 27, 2020 at 7:09 PM Zhou Zach  wrote:
> >
> >> The program finished with the following exception:
> >>
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: The main
> >> method caused an error: SQL parse failed. Encountered "year =" at line
> 4,
> >> column 51.
> >> Was expecting one of:
> >> "ARRAY" ...
> >> "CASE" ...
> >> "CURRENT" ...
> >> "CURRENT_CATALOG" ...
> >> "CURRENT_DATE" ...
> >> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
> >> "CURRENT_PATH" ...
> >> "CURRENT_ROLE" ...
> >> "CURRENT_SCHEMA" ...
> >> "CURRENT_TIME" ...
> >> "CURRENT_TIMESTAMP" ...
> >> "CURRENT_USER" ...
> >> "DATE" ...
> >> "EXISTS" ...
> >> "FALSE" ...
> >> "INTERVAL" ...
> >> "LOCALTIME" ...
> >> "LOCALTIMESTAMP" ...
> >> "MULTISET" ...
> >> "NEW" ...
> >> "NEXT" ...
> >> "NOT" ...
> >> "NULL" ...
> >> "PERIOD" ...
> >> "SESSION_USER" ...
> >> "SYSTEM_USER" ...
> >> "TIME" ...
> >> "TIMESTAMP" ...
> >> "TRUE" ...
> >> "UNKNOWN" ...
> >> "USER" ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >> "?" ...
> >> "+" ...
> >> "-" ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >>  ...
> >> "CAST" ...
> >> "EXTRACT" ...
> >> "POSITION" ...
> >> "CONVERT" ...
> >> "TRANSLATE" ...
> >> "OVERLAY" ...
> >> "FLOOR" ...
> >> "CEIL" ...
> >> "CEILING" ...
> >> "SUBSTRING" ...
> >> "TRIM" ...
> >> "CLASSIFIER" ...
> >> "MATCH_NUMBER" ...
> >> "RUNNING" ...
> >> "PREV" ...
> >> "JSON_EXISTS" ...
> >> "JSON_VALUE" ...
> >> "JSON_QUERY" ...
> >> "JSON_OBJECT" ...
> >> "JSON_OBJECTAGG" ...
> >> "JSON_ARRAY" ...
> >> "JSON_ARRAYAGG" ...
> >> "MAP" ...
> >> "SPECIFIC" ...
> >> "ABS" ...
> >> "AVG" ...
> >> "CARDINALITY" ...
> >> "CHAR_LENGTH" ...
> >> "CHARACTER_LENGTH" ...
> >> "COALESCE" ...
> >> "COLLECT" ...
> >> "COVAR_POP" ...
> >> "COVAR_SAMP" ...
> >> "CUME_DIST" ...
> >> "COUNT" ...
> >> "DENSE_RANK" ...
> >> "ELEMENT" ...
> >> "EXP" ...
> >> "FIRST_VALUE" ...
> >> "FUSION" ...
> >> "GROUPING" ...
> >> "HOUR" ...
> >> "LAG" ...
> >> "LEAD" ...
> >> "LEFT" ...
> >> "LAST_VALUE" ...
> >> "LN" ...
> >> "LOWER" ...
> >> "MAX" ...
> >> "MIN" ...
> >> "MINUTE" ...
> >> "MOD" ...
> >> "MONTH" ...
> >> "NTH_VALUE" ...
> >> "NTILE" ...
> >> "NULLIF" ...
> >> "OCTET_LENGTH" ...
> >> "PERCENT_RANK" ...
> >> "POWER" ...
> >> "RANK" ...
> >> "REGR_COUNT" ...
> >> "REGR_SXX" ...
> >> "REGR_SYY" ...
> >> "RIGHT" ...
> >> "ROW_NUMBER" ...
> >> "SECOND" ...
> >> "SQRT" ...
> >> "STDDEV_POP" ...
> >> "STDDEV_SAMP" ...
> >> "SUM" ...
> >> "UPPER" ...
> >> "TRUNCATE" ...
> >> "VAR_POP" ...
> >> "VAR_SAMP" ...
> >> "YEAR" ...
> >> "YEAR" "(" ...
> >>
> >>
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> >> at java.security.AccessController.doPrivileged(Native Method)
> >> at javax.security.auth.Subject.doAs(Subject.java:422)
> >> at
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> >> at
> >>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> >> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
> >> failed. Encountered "year =" at line 4, column 51.
> >> Was expecting one of:
> >> "ARRAY" ...
> >> "CASE" ...
> >> "CURRENT" ...
> >> "CURRENT_CATALOG" ...
> >> "CURRENT_DATE" ...
> >> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
> >> "CURRENT_PATH"

Re:Re: Re: Re: Flink sql 跨库

2020-05-27 文章 Zhou Zach
是的,发现了,感谢指点。请教下,用intellij 
idea调试,你是在本地调试吗,那样的话,要在本地搭建个hadoop集群吗,至少要搭建个本地的hive吧,还是直接用intellij 
idea连接远程,如果集群在阿里云上,是不是要另外开端口的

















在 2020-05-27 19:19:58,"Rui Li"  写道:
>year在calcite里是保留关键字,你用`year`试试呢
>
>On Wed, May 27, 2020 at 7:09 PM Zhou Zach  wrote:
>
>> The program finished with the following exception:
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: SQL parse failed. Encountered "year =" at line 4,
>> column 51.
>> Was expecting one of:
>> "ARRAY" ...
>> "CASE" ...
>> "CURRENT" ...
>> "CURRENT_CATALOG" ...
>> "CURRENT_DATE" ...
>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>> "CURRENT_PATH" ...
>> "CURRENT_ROLE" ...
>> "CURRENT_SCHEMA" ...
>> "CURRENT_TIME" ...
>> "CURRENT_TIMESTAMP" ...
>> "CURRENT_USER" ...
>> "DATE" ...
>> "EXISTS" ...
>> "FALSE" ...
>> "INTERVAL" ...
>> "LOCALTIME" ...
>> "LOCALTIMESTAMP" ...
>> "MULTISET" ...
>> "NEW" ...
>> "NEXT" ...
>> "NOT" ...
>> "NULL" ...
>> "PERIOD" ...
>> "SESSION_USER" ...
>> "SYSTEM_USER" ...
>> "TIME" ...
>> "TIMESTAMP" ...
>> "TRUE" ...
>> "UNKNOWN" ...
>> "USER" ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>> "?" ...
>> "+" ...
>> "-" ...
>>  ...
>>  ...
>>  ...
>>  ...
>>  ...
>> "CAST" ...
>> "EXTRACT" ...
>> "POSITION" ...
>> "CONVERT" ...
>> "TRANSLATE" ...
>> "OVERLAY" ...
>> "FLOOR" ...
>> "CEIL" ...
>> "CEILING" ...
>> "SUBSTRING" ...
>> "TRIM" ...
>> "CLASSIFIER" ...
>> "MATCH_NUMBER" ...
>> "RUNNING" ...
>> "PREV" ...
>> "JSON_EXISTS" ...
>> "JSON_VALUE" ...
>> "JSON_QUERY" ...
>> "JSON_OBJECT" ...
>> "JSON_OBJECTAGG" ...
>> "JSON_ARRAY" ...
>> "JSON_ARRAYAGG" ...
>> "MAP" ...
>> "SPECIFIC" ...
>> "ABS" ...
>> "AVG" ...
>> "CARDINALITY" ...
>> "CHAR_LENGTH" ...
>> "CHARACTER_LENGTH" ...
>> "COALESCE" ...
>> "COLLECT" ...
>> "COVAR_POP" ...
>> "COVAR_SAMP" ...
>> "CUME_DIST" ...
>> "COUNT" ...
>> "DENSE_RANK" ...
>> "ELEMENT" ...
>> "EXP" ...
>> "FIRST_VALUE" ...
>> "FUSION" ...
>> "GROUPING" ...
>> "HOUR" ...
>> "LAG" ...
>> "LEAD" ...
>> "LEFT" ...
>> "LAST_VALUE" ...
>> "LN" ...
>> "LOWER" ...
>> "MAX" ...
>> "MIN" ...
>> "MINUTE" ...
>> "MOD" ...
>> "MONTH" ...
>> "NTH_VALUE" ...
>> "NTILE" ...
>> "NULLIF" ...
>> "OCTET_LENGTH" ...
>> "PERCENT_RANK" ...
>> "POWER" ...
>> "RANK" ...
>> "REGR_COUNT" ...
>> "REGR_SXX" ...
>> "REGR_SYY" ...
>> "RIGHT" ...
>> "ROW_NUMBER" ...
>> "SECOND" ...
>> "SQRT" ...
>> "STDDEV_POP" ...
>> "STDDEV_SAMP" ...
>> "SUM" ...
>> "UPPER" ...
>> "TRUNCATE" ...
>> "VAR_POP" ...
>> "VAR_SAMP" ...
>> "YEAR" ...
>> "YEAR" "(" ...
>>
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
>> failed. Encountered "year =" at line 4, column 51.
>> Was expecting one of:
>> "ARRAY" ...
>> "CASE" ...
>> "CURRENT" ...
>> "CURRENT_CATALOG" ...
>> "CURRENT_DATE" ...
>> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>> "CURRENT_PATH" ...
>> "CURRENT_ROLE" ...
>> "CURRENT_SCHEMA" ...
>> "CURRENT_TIME" ...
>> "CURRENT_TIMESTAMP" ...
>> "CURRENT_USER" ...
>> "DATE" ...
>> "EXISTS" ...
>> "FALSE" ...
>> "INTERVAL" ...
>> "LOCALTIME" ...
>> "LOCALTIMESTAMP" ...
>> "MULTISET" ...
>> "NEW" ...
>> "NEXT" ...
>> "NOT" ...
>> "NULL" ...
>> "PERIOD" ...
>> "SESSION_USER" ...
>> "SYSTEM_USER" ...
>> "TIME" ...
>> "TIMESTAMP" ...
>> "TRUE" ...
>> "UNKNOWN" ...
>> "USER" ...
>>  ...
>>  ...
>>  

Re:Re:Re: Re: Flink sql 跨库

2020-05-27 文章 Zhou Zach
找到原因了,flink 把year 当成关键字了

















At 2020-05-27 19:09:43, "Zhou Zach"  wrote:
>The program finished with the following exception:
>
>
>org.apache.flink.client.program.ProgramInvocationException: The main method 
>caused an error: SQL parse failed. Encountered "year =" at line 4, column 51.
>Was expecting one of:
>"ARRAY" ...
>"CASE" ...
>"CURRENT" ...
>"CURRENT_CATALOG" ...
>"CURRENT_DATE" ...
>"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>"CURRENT_PATH" ...
>"CURRENT_ROLE" ...
>"CURRENT_SCHEMA" ...
>"CURRENT_TIME" ...
>"CURRENT_TIMESTAMP" ...
>"CURRENT_USER" ...
>"DATE" ...
>"EXISTS" ...
>"FALSE" ...
>"INTERVAL" ...
>"LOCALTIME" ...
>"LOCALTIMESTAMP" ...
>"MULTISET" ...
>"NEW" ...
>"NEXT" ...
>"NOT" ...
>"NULL" ...
>"PERIOD" ...
>"SESSION_USER" ...
>"SYSTEM_USER" ...
>"TIME" ...
>"TIMESTAMP" ...
>"TRUE" ...
>"UNKNOWN" ...
>"USER" ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
>"?" ...
>"+" ...
>"-" ...
> ...
> ...
> ...
> ...
> ...
>"CAST" ...
>"EXTRACT" ...
>"POSITION" ...
>"CONVERT" ...
>"TRANSLATE" ...
>"OVERLAY" ...
>"FLOOR" ...
>"CEIL" ...
>"CEILING" ...
>"SUBSTRING" ...
>"TRIM" ...
>"CLASSIFIER" ...
>"MATCH_NUMBER" ...
>"RUNNING" ...
>"PREV" ...
>"JSON_EXISTS" ...
>"JSON_VALUE" ...
>"JSON_QUERY" ...
>"JSON_OBJECT" ...
>"JSON_OBJECTAGG" ...
>"JSON_ARRAY" ...
>"JSON_ARRAYAGG" ...
>"MAP" ...
>"SPECIFIC" ...
>"ABS" ...
>"AVG" ...
>"CARDINALITY" ...
>"CHAR_LENGTH" ...
>"CHARACTER_LENGTH" ...
>"COALESCE" ...
>"COLLECT" ...
>"COVAR_POP" ...
>"COVAR_SAMP" ...
>"CUME_DIST" ...
>"COUNT" ...
>"DENSE_RANK" ...
>"ELEMENT" ...
>"EXP" ...
>"FIRST_VALUE" ...
>"FUSION" ...
>"GROUPING" ...
>"HOUR" ...
>"LAG" ...
>"LEAD" ...
>"LEFT" ...
>"LAST_VALUE" ...
>"LN" ...
>"LOWER" ...
>"MAX" ...
>"MIN" ...
>"MINUTE" ...
>"MOD" ...
>"MONTH" ...
>"NTH_VALUE" ...
>"NTILE" ...
>"NULLIF" ...
>"OCTET_LENGTH" ...
>"PERCENT_RANK" ...
>"POWER" ...
>"RANK" ...
>"REGR_COUNT" ...
>"REGR_SXX" ...
>"REGR_SYY" ...
>"RIGHT" ...
>"ROW_NUMBER" ...
>"SECOND" ...
>"SQRT" ...
>"STDDEV_POP" ...
>"STDDEV_SAMP" ...
>"SUM" ...
>"UPPER" ...
>"TRUNCATE" ...
>"VAR_POP" ...
>"VAR_SAMP" ...
>"YEAR" ...
>"YEAR" "(" ...
>
>
>at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>at 
>org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>at 
>org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>at java.security.AccessController.doPrivileged(Native Method)
>at javax.security.auth.Subject.doAs(Subject.java:422)
>at 
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>at 
>org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
>Encountered "year =" at line 4, column 51.
>Was expecting one of:
>"ARRAY" ...
>"CASE" ...
>"CURRENT" ...
>"CURRENT_CATALOG" ...
>"CURRENT_DATE" ...
>"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
>"CURRENT_PATH" ...
>"CURRENT_ROLE" ...
>"CURRENT_SCHEMA" ...
>"CURRENT_TIME" ...
>"CURRENT_TIMESTAMP" ...
>"CURRENT_USER" ...
>"DATE" ...
>"EXISTS" ...
>"FALSE" ...
>"INTERVAL" ...
>"LOCALTIME" ...
>"LOCALTIMESTAMP" ...
>"MULTISET" ...
>"NEW" ...
>"NEXT" ...
>"NOT" ...
>"NULL" ...
>"PERIOD" ...
>"SESSION_USER" ...
>"SYSTEM_USER" ...
>"TIME" ...
>"TIMESTAMP" ...
>"TRUE" ...
>"UNKNOWN" ...
>"USER" ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
>"?" ...
>"+" ...
>"-" ...
> ...
> ...
> ...
> ...
> ...
>"CAST" ...
>"EXTRACT" ...
>"POSITION" ...
>"CONVERT" ...
>"TRANSLATE" ...
>"OVERLAY" ...
>"FLOOR" ...
>"CEIL" ...
>"CEILING" ...
>"SUBSTRING" ...
>"TRIM" ...
>"CLASSIFIER" ...
>"MATCH_NUMBER" ...
>"RUNNING" ...
>"PREV" ...
>"JSON_EXISTS" ...
>"JSON_VALUE" ...
>"JSON_QUERY" ...
>"JSON_OBJECT" ...
>  

Re: Re: Re: Flink sql 跨库

2020-05-27 文章 Rui Li
year在calcite里是保留关键字,你用`year`试试呢

On Wed, May 27, 2020 at 7:09 PM Zhou Zach  wrote:

> The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: SQL parse failed. Encountered "year =" at line 4,
> column 51.
> Was expecting one of:
> "ARRAY" ...
> "CASE" ...
> "CURRENT" ...
> "CURRENT_CATALOG" ...
> "CURRENT_DATE" ...
> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
> "CURRENT_PATH" ...
> "CURRENT_ROLE" ...
> "CURRENT_SCHEMA" ...
> "CURRENT_TIME" ...
> "CURRENT_TIMESTAMP" ...
> "CURRENT_USER" ...
> "DATE" ...
> "EXISTS" ...
> "FALSE" ...
> "INTERVAL" ...
> "LOCALTIME" ...
> "LOCALTIMESTAMP" ...
> "MULTISET" ...
> "NEW" ...
> "NEXT" ...
> "NOT" ...
> "NULL" ...
> "PERIOD" ...
> "SESSION_USER" ...
> "SYSTEM_USER" ...
> "TIME" ...
> "TIMESTAMP" ...
> "TRUE" ...
> "UNKNOWN" ...
> "USER" ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
> "?" ...
> "+" ...
> "-" ...
>  ...
>  ...
>  ...
>  ...
>  ...
> "CAST" ...
> "EXTRACT" ...
> "POSITION" ...
> "CONVERT" ...
> "TRANSLATE" ...
> "OVERLAY" ...
> "FLOOR" ...
> "CEIL" ...
> "CEILING" ...
> "SUBSTRING" ...
> "TRIM" ...
> "CLASSIFIER" ...
> "MATCH_NUMBER" ...
> "RUNNING" ...
> "PREV" ...
> "JSON_EXISTS" ...
> "JSON_VALUE" ...
> "JSON_QUERY" ...
> "JSON_OBJECT" ...
> "JSON_OBJECTAGG" ...
> "JSON_ARRAY" ...
> "JSON_ARRAYAGG" ...
> "MAP" ...
> "SPECIFIC" ...
> "ABS" ...
> "AVG" ...
> "CARDINALITY" ...
> "CHAR_LENGTH" ...
> "CHARACTER_LENGTH" ...
> "COALESCE" ...
> "COLLECT" ...
> "COVAR_POP" ...
> "COVAR_SAMP" ...
> "CUME_DIST" ...
> "COUNT" ...
> "DENSE_RANK" ...
> "ELEMENT" ...
> "EXP" ...
> "FIRST_VALUE" ...
> "FUSION" ...
> "GROUPING" ...
> "HOUR" ...
> "LAG" ...
> "LEAD" ...
> "LEFT" ...
> "LAST_VALUE" ...
> "LN" ...
> "LOWER" ...
> "MAX" ...
> "MIN" ...
> "MINUTE" ...
> "MOD" ...
> "MONTH" ...
> "NTH_VALUE" ...
> "NTILE" ...
> "NULLIF" ...
> "OCTET_LENGTH" ...
> "PERCENT_RANK" ...
> "POWER" ...
> "RANK" ...
> "REGR_COUNT" ...
> "REGR_SXX" ...
> "REGR_SYY" ...
> "RIGHT" ...
> "ROW_NUMBER" ...
> "SECOND" ...
> "SQRT" ...
> "STDDEV_POP" ...
> "STDDEV_SAMP" ...
> "SUM" ...
> "UPPER" ...
> "TRUNCATE" ...
> "VAR_POP" ...
> "VAR_SAMP" ...
> "YEAR" ...
> "YEAR" "(" ...
>
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
> failed. Encountered "year =" at line 4, column 51.
> Was expecting one of:
> "ARRAY" ...
> "CASE" ...
> "CURRENT" ...
> "CURRENT_CATALOG" ...
> "CURRENT_DATE" ...
> "CURRENT_DEFAULT_TRANSFORM_GROUP" ...
> "CURRENT_PATH" ...
> "CURRENT_ROLE" ...
> "CURRENT_SCHEMA" ...
> "CURRENT_TIME" ...
> "CURRENT_TIMESTAMP" ...
> "CURRENT_USER" ...
> "DATE" ...
> "EXISTS" ...
> "FALSE" ...
> "INTERVAL" ...
> "LOCALTIME" ...
> "LOCALTIMESTAMP" ...
> "MULTISET" ...
> "NEW" ...
> "NEXT" ...
> "NOT" ...
> "NULL" ...
> "PERIOD" ...
> "SESSION_USER" ...
> "SYSTEM_USER" ...
> "TIME" ...
> "TIMESTAMP" ...
> "TRUE" ...
> "UNKNOWN" ...
> "USER" ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
>  ...
> "?" ...
> "+" ...
> "-" ...
>  ...
>  ...
>  ...
>  ...
>  ...
> "CAST" ...
> "EXTRACT" ...
> "POSITION" ...
> "CONVERT" ...
> "TRANSLATE" ...
> "OVERLAY" ...
> "FLOOR" ...
> "CEIL" ...
> "CEILING" ...
> "SU

Re:Re: Re: Flink sql 跨库

2020-05-27 文章 Zhou Zach
The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: SQL parse failed. Encountered "year =" at line 4, column 51.
Was expecting one of:
"ARRAY" ...
"CASE" ...
"CURRENT" ...
"CURRENT_CATALOG" ...
"CURRENT_DATE" ...
"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
"CURRENT_PATH" ...
"CURRENT_ROLE" ...
"CURRENT_SCHEMA" ...
"CURRENT_TIME" ...
"CURRENT_TIMESTAMP" ...
"CURRENT_USER" ...
"DATE" ...
"EXISTS" ...
"FALSE" ...
"INTERVAL" ...
"LOCALTIME" ...
"LOCALTIMESTAMP" ...
"MULTISET" ...
"NEW" ...
"NEXT" ...
"NOT" ...
"NULL" ...
"PERIOD" ...
"SESSION_USER" ...
"SYSTEM_USER" ...
"TIME" ...
"TIMESTAMP" ...
"TRUE" ...
"UNKNOWN" ...
"USER" ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
"?" ...
"+" ...
"-" ...
 ...
 ...
 ...
 ...
 ...
"CAST" ...
"EXTRACT" ...
"POSITION" ...
"CONVERT" ...
"TRANSLATE" ...
"OVERLAY" ...
"FLOOR" ...
"CEIL" ...
"CEILING" ...
"SUBSTRING" ...
"TRIM" ...
"CLASSIFIER" ...
"MATCH_NUMBER" ...
"RUNNING" ...
"PREV" ...
"JSON_EXISTS" ...
"JSON_VALUE" ...
"JSON_QUERY" ...
"JSON_OBJECT" ...
"JSON_OBJECTAGG" ...
"JSON_ARRAY" ...
"JSON_ARRAYAGG" ...
"MAP" ...
"SPECIFIC" ...
"ABS" ...
"AVG" ...
"CARDINALITY" ...
"CHAR_LENGTH" ...
"CHARACTER_LENGTH" ...
"COALESCE" ...
"COLLECT" ...
"COVAR_POP" ...
"COVAR_SAMP" ...
"CUME_DIST" ...
"COUNT" ...
"DENSE_RANK" ...
"ELEMENT" ...
"EXP" ...
"FIRST_VALUE" ...
"FUSION" ...
"GROUPING" ...
"HOUR" ...
"LAG" ...
"LEAD" ...
"LEFT" ...
"LAST_VALUE" ...
"LN" ...
"LOWER" ...
"MAX" ...
"MIN" ...
"MINUTE" ...
"MOD" ...
"MONTH" ...
"NTH_VALUE" ...
"NTILE" ...
"NULLIF" ...
"OCTET_LENGTH" ...
"PERCENT_RANK" ...
"POWER" ...
"RANK" ...
"REGR_COUNT" ...
"REGR_SXX" ...
"REGR_SYY" ...
"RIGHT" ...
"ROW_NUMBER" ...
"SECOND" ...
"SQRT" ...
"STDDEV_POP" ...
"STDDEV_SAMP" ...
"SUM" ...
"UPPER" ...
"TRUNCATE" ...
"VAR_POP" ...
"VAR_SAMP" ...
"YEAR" ...
"YEAR" "(" ...


at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. 
Encountered "year =" at line 4, column 51.
Was expecting one of:
"ARRAY" ...
"CASE" ...
"CURRENT" ...
"CURRENT_CATALOG" ...
"CURRENT_DATE" ...
"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
"CURRENT_PATH" ...
"CURRENT_ROLE" ...
"CURRENT_SCHEMA" ...
"CURRENT_TIME" ...
"CURRENT_TIMESTAMP" ...
"CURRENT_USER" ...
"DATE" ...
"EXISTS" ...
"FALSE" ...
"INTERVAL" ...
"LOCALTIME" ...
"LOCALTIMESTAMP" ...
"MULTISET" ...
"NEW" ...
"NEXT" ...
"NOT" ...
"NULL" ...
"PERIOD" ...
"SESSION_USER" ...
"SYSTEM_USER" ...
"TIME" ...
"TIMESTAMP" ...
"TRUE" ...
"UNKNOWN" ...
"USER" ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
"?" ...
"+" ...
"-" ...
 ...
 ...
 ...
 ...
 ...
"CAST" ...
"EXTRACT" ...
"POSITION" ...
"CONVERT" ...
"TRANSLATE" ...
"OVERLAY" ...
"FLOOR" ...
"CEIL" ...
"CEILING" ...
"SUBSTRING" ...
"TRIM" ...
"CLASSIFIER" ...
"MATCH_NUMBER" ...
"RUNNING" ...
"PREV" ...
"JSON_EXISTS" ...
"JSON_VALUE" ...
"JSON_QUERY" ...
"JSON_OBJECT" ...
"JSON_OBJECTAGG" ...
"JSON_ARRAY" ...
"JSON_ARRAYAGG" ...
"MAP" ...
"SPECIFIC" ...
"ABS" ...
"AVG" ...
"CARDINALITY" ...
"CHAR_LENGTH" ...
"CHARACTER_LENGTH" ...
"COALESCE" ...
"COLLECT" ...
"COVAR_POP" ...
"COVAR_SAMP" ...
"CUME_DIST" ...
"COUNT" ...
 

Re: Re: Flink sql 跨库

2020-05-27 文章 Rui Li
读hive分区表报的什么错啊,把stacktrace贴一下?

On Wed, May 27, 2020 at 6:08 PM Zhou Zach  wrote:

>
>
> hive partition table:
>
>
> 1CREATE TABLE `dwd.bill`(
> 2  `id` bigint,
> 3  `gid` bigint,
> 4  `count` bigint,
> 5  `price` bigint,
> 6  `srcuid` bigint,
> 7  `srcnickname` string,
> 8  `srcleftmoney` bigint,
> 9  `srcwealth` bigint,
> 10  `srccredit` decimal(10,0),
> 11  `dstnickname` string,
> 12  `dstuid` bigint,
> 13  `familyid` int,
> 14  `dstleftmoney` bigint,
> 15  `dstwealth` bigint,
> 16  `dstcredit` decimal(10,0),
> 17  `addtime` bigint,
> 18  `type` int,
> 19  `getmoney` decimal(10,0),
> 20  `os` int,
> 21  `bak` string,
> 22  `getbonus` decimal(10,0),
> 23  `unionbonus` decimal(10,0))
> 24PARTITIONED BY (
> 25  `year` int,
> 26  `month` int)
> 27ROW FORMAT SERDE
> 28  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
> 29STORED AS INPUTFORMAT
> 30  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
> 31OUTPUTFORMAT
> 32  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
>
>
>
>
> Query:
>
>
> tableEnv.sqlUpdate(
>   """
> |
> |INSERT INTO catalog2.dwd.orders
> |select srcuid, price from catalog2.dwd.bill where year = 2020
> |
> |
> |""".stripMargin)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-05-27 18:01:19,"Leonard Xu"  写道:
> >Flink 支持hive分区表的,看你在另外一个邮件里贴了,你能把你的hive表和query在邮件里贴下吗?
> >
> >祝好
> >Leonard Xu
> >
> >> 在 2020年5月27日,17:40,Zhou Zach  写道:
> >>
> >>
> >>
> >>
> >> 感谢回复,表名前加上Catalog和db前缀可以成功访问了。
> >> 现在遇到个问题,flink 读hive
> 分区表时,如果where子句用分区键,比如year过滤就会报错,用表中其他字段过滤是没问题的,是flink 不支持 hive分区表,还是哪个地方没设置对
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-05-27 17:33:11,"Leonard Xu"  写道:
> >>> Hi,
>  因为一个HiveCatalog只能关联一个库
> >>> 一个Catalog是可以关联到多个db的,不同catalog,不同db中表都可以访问的.
> >>>
> >>> Flink SQL> show catalogs;
> >>> default_catalog
> >>> myhive
> >>> Flink SQL> use catalog myhive;
> >>> Flink SQL> show databases;
> >>> default
> >>> hive_test
> >>> hive_test1
> >>> Flink SQL> select * from hive_test.db2_table union select * from
> myhive.hive_test1.db1_table;
> >>> 2020-05-27 17:25:48,565 INFO  org.apache.hadoop.hive.conf.HiveConf
> >>>
> >>>
> >>>
> >>> 祝好
> >>> Leonard Xu
> >>>
> >>>
>  在 2020年5月27日,10:55,Zhou Zach  写道:
> 
>  hi all,
>  Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink
> sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库
>


-- 
Best regards!
Rui Li


Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 文章 Benchao Li
你的理解是对的。你可以尝试下用time windowed join[1],这个不管是什么join类型,结果都是append的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

macia kk  于2020年5月27日周三 下午6:56写道:

> 感谢 Benchao
>
> 原来如此,我之前用的是 spark structured Streming, 可能模式跟Flink不太一样,它会缓存没有 join 到的数据,直到
> watermark 结束之后才 emit
>
> Flink 新的数据进来跟右边的缓存数据 join, 没有 join 到先发 null,但是这个数据还会缓存, 后边右边如果有新的数据可以 join
> 到左边已经发出去的这条数据,会产生 retract. (我的理解)
>
>
> 那我这种情况有别的解决方案吗?因为我的 Sink (Kafka) 下游是 Druid, 数据会直接 index 后作为查询,不支持 retract
> 场景。
>
>
>
>
> Benchao Li  于2020年5月27日周三 下午6:32写道:
>
> > 产生retract消息的场景有很多,暂时还没有一篇文档来介绍这个,我大概列举几个典型的场景吧:
> > 1. regular group by,因为聚合的结果是实时下发的,所以更新了聚合结果就会retract老的聚合结果
> > 2. 非inner/anti 的join(不包括time interval
> > join),这种原因是如果当前join不上,会发送null,但是后面可能对面可能又会有数据进来,导致下发的null需要被retract
> > 3. 取latest的去重
> > 4. topn,排名变化需要更新结果
> > 5. window + emit,提前emit的结果需要retract来更新
> >
> > macia kk  于2020年5月27日周三 下午6:19写道:
> >
> > > 感谢 Benchao 和  Leonard 的回复
> > >
> > > 我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
> > > 出去,但是什么情况下会产生 react 消息呢?
> > >
> > > Leonard Xu  于2020年5月27日周三 下午3:50写道:
> > >
> > > > Hi
> > > > Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> > > > sink无法处理retract消息。
> > > > 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
> > > >
> > > > 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> > > > 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
> > > >
> > > >
> > > > 祝好,
> > > > Leonard Xu
> > > >
> > > >
> > > >
> > > > > 在 2020年5月27日,10:23,Benchao Li  写道:
> > > > >
> > > > > 而且你的SQL里面有一部分是会产生retract的:
> > > > > 这里用的是regular left join,这种join类型是会产生retract结果的。
> > > > >
> > > > >   | FROM (
> > > > >   |SELECT `database`, `table`,
> > > > > `transaction_type`, `transaction_id`,
> > > > >   |`merchant_id`, `event_time`,
> `status`,
> > > > > `reference_id`
> > > > >   |FROM main_table
> > > > >   |LEFT JOIN merchant_table
> > > > >   |ON main_table.reference_id =
> > > > > merchant_table.transaction_sn
> > > > >   | )
> > > > >
> > > > >
> > > > > macia kk  于2020年5月27日周三 上午1:20写道:
> > > > >
> > > > >> Hi,各位大佬,谁有空帮我看下这个问题
> > > > >>
> > > > >> Source: Kafka
> > > > >> SinkL Kafka
> > > > >>
> > > > >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE
> > > 函数取第一条
> > > > >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> > > > >>
> > > > >> Error
> > > > >>
> > > > >> org.apache.flink.client.program.ProgramInvocationException: The
> main
> > > > >> method caused an error: AppendStreamTableSink requires that Table
> > has
> > > > >> only insert changes.
> > > > >>at
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > > > >>at
> > > > >>
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > > > >>at
> > > > >>
> > >
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> Code
> > > > >>
> > > > >>   val main_column = "`database`, `table`, `transaction_type`,
> > > > >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> > > > >> `status`"
> > > > >>val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> > > > >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> > > > >>bsTableEnv.createTemporaryView("main_table", main_table)
> > > > >>
> > > > >>val merchant_column = "transaction_sn, user_id"
> > > > >>val merchant_table = bsTableEnv.sqlQuery(s"SELECT
> > $merchant_column
> > > > >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> > > > >> 'wallet_id_merchant_db%' ")
> > > > >>bsTableEnv.createTemporaryView("merchant_table",
> merchant_table)
> > > > >>
> > > > >>bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> > > > >>   | SELECT `database`, `table`,
> > > > >> `transaction_type`,
> > > > >>   |   `merchant_id`, `event_time`,
> `status`,
> > > > >>   |FIRST_VALUE(`transaction_id`) OVER
> > > > >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> > > > >> PRECEDING)
> > > > >>   | FROM (
> > > > >>   |SELECT `database`, `table`,
> > > > >> `transaction_type`, `transaction_id`,
> > > > >>   |`merchant_id`, `event_time`,
> > `status`,
> > > > >> `reference_id`
> > > > >>   |FROM main_table
> > > > >>   |LEFT JOIN merchant_table
> > > > >>   |ON main_table.reference_id =
> > > > >> merchant_table.transaction_sn
> > > > >>   

Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 文章 macia kk
感谢 Benchao

原来如此,我之前用的是 spark structured Streming, 可能模式跟Flink不太一样,它会缓存没有 join 到的数据,直到
watermark 结束之后才 emit

Flink 新的数据进来跟右边的缓存数据 join, 没有 join 到先发 null,但是这个数据还会缓存, 后边右边如果有新的数据可以 join
到左边已经发出去的这条数据,会产生 retract. (我的理解)


那我这种情况有别的解决方案吗?因为我的 Sink (Kafka) 下游是 Druid, 数据会直接 index 后作为查询,不支持 retract
场景。




Benchao Li  于2020年5月27日周三 下午6:32写道:

> 产生retract消息的场景有很多,暂时还没有一篇文档来介绍这个,我大概列举几个典型的场景吧:
> 1. regular group by,因为聚合的结果是实时下发的,所以更新了聚合结果就会retract老的聚合结果
> 2. 非inner/anti 的join(不包括time interval
> join),这种原因是如果当前join不上,会发送null,但是后面可能对面可能又会有数据进来,导致下发的null需要被retract
> 3. 取latest的去重
> 4. topn,排名变化需要更新结果
> 5. window + emit,提前emit的结果需要retract来更新
>
> macia kk  于2020年5月27日周三 下午6:19写道:
>
> > 感谢 Benchao 和  Leonard 的回复
> >
> > 我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
> > 出去,但是什么情况下会产生 react 消息呢?
> >
> > Leonard Xu  于2020年5月27日周三 下午3:50写道:
> >
> > > Hi
> > > Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> > > sink无法处理retract消息。
> > > 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
> > >
> > > 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> > > 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
> > >
> > >
> > > 祝好,
> > > Leonard Xu
> > >
> > >
> > >
> > > > 在 2020年5月27日,10:23,Benchao Li  写道:
> > > >
> > > > 而且你的SQL里面有一部分是会产生retract的:
> > > > 这里用的是regular left join,这种join类型是会产生retract结果的。
> > > >
> > > >   | FROM (
> > > >   |SELECT `database`, `table`,
> > > > `transaction_type`, `transaction_id`,
> > > >   |`merchant_id`, `event_time`, `status`,
> > > > `reference_id`
> > > >   |FROM main_table
> > > >   |LEFT JOIN merchant_table
> > > >   |ON main_table.reference_id =
> > > > merchant_table.transaction_sn
> > > >   | )
> > > >
> > > >
> > > > macia kk  于2020年5月27日周三 上午1:20写道:
> > > >
> > > >> Hi,各位大佬,谁有空帮我看下这个问题
> > > >>
> > > >> Source: Kafka
> > > >> SinkL Kafka
> > > >>
> > > >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE
> > 函数取第一条
> > > >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> > > >>
> > > >> Error
> > > >>
> > > >> org.apache.flink.client.program.ProgramInvocationException: The main
> > > >> method caused an error: AppendStreamTableSink requires that Table
> has
> > > >> only insert changes.
> > > >>at
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > > >>at
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > > >>at
> > > >>
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> Code
> > > >>
> > > >>   val main_column = "`database`, `table`, `transaction_type`,
> > > >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> > > >> `status`"
> > > >>val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> > > >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> > > >>bsTableEnv.createTemporaryView("main_table", main_table)
> > > >>
> > > >>val merchant_column = "transaction_sn, user_id"
> > > >>val merchant_table = bsTableEnv.sqlQuery(s"SELECT
> $merchant_column
> > > >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> > > >> 'wallet_id_merchant_db%' ")
> > > >>bsTableEnv.createTemporaryView("merchant_table", merchant_table)
> > > >>
> > > >>bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> > > >>   | SELECT `database`, `table`,
> > > >> `transaction_type`,
> > > >>   |   `merchant_id`, `event_time`, `status`,
> > > >>   |FIRST_VALUE(`transaction_id`) OVER
> > > >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> > > >> PRECEDING)
> > > >>   | FROM (
> > > >>   |SELECT `database`, `table`,
> > > >> `transaction_type`, `transaction_id`,
> > > >>   |`merchant_id`, `event_time`,
> `status`,
> > > >> `reference_id`
> > > >>   |FROM main_table
> > > >>   |LEFT JOIN merchant_table
> > > >>   |ON main_table.reference_id =
> > > >> merchant_table.transaction_sn
> > > >>   | )
> > > >>   |""".stripMargin)
> > > >>
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 文章 Benchao Li
产生retract消息的场景有很多,暂时还没有一篇文档来介绍这个,我大概列举几个典型的场景吧:
1. regular group by,因为聚合的结果是实时下发的,所以更新了聚合结果就会retract老的聚合结果
2. 非inner/anti 的join(不包括time interval
join),这种原因是如果当前join不上,会发送null,但是后面可能对面可能又会有数据进来,导致下发的null需要被retract
3. 取latest的去重
4. topn,排名变化需要更新结果
5. window + emit,提前emit的结果需要retract来更新

macia kk  于2020年5月27日周三 下午6:19写道:

> 感谢 Benchao 和  Leonard 的回复
>
> 我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
> 出去,但是什么情况下会产生 react 消息呢?
>
> Leonard Xu  于2020年5月27日周三 下午3:50写道:
>
> > Hi
> > Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> > sink无法处理retract消息。
> > 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
> >
> > 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> > 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
> >
> >
> > 祝好,
> > Leonard Xu
> >
> >
> >
> > > 在 2020年5月27日,10:23,Benchao Li  写道:
> > >
> > > 而且你的SQL里面有一部分是会产生retract的:
> > > 这里用的是regular left join,这种join类型是会产生retract结果的。
> > >
> > >   | FROM (
> > >   |SELECT `database`, `table`,
> > > `transaction_type`, `transaction_id`,
> > >   |`merchant_id`, `event_time`, `status`,
> > > `reference_id`
> > >   |FROM main_table
> > >   |LEFT JOIN merchant_table
> > >   |ON main_table.reference_id =
> > > merchant_table.transaction_sn
> > >   | )
> > >
> > >
> > > macia kk  于2020年5月27日周三 上午1:20写道:
> > >
> > >> Hi,各位大佬,谁有空帮我看下这个问题
> > >>
> > >> Source: Kafka
> > >> SinkL Kafka
> > >>
> > >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE
> 函数取第一条
> > >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> > >>
> > >> Error
> > >>
> > >> org.apache.flink.client.program.ProgramInvocationException: The main
> > >> method caused an error: AppendStreamTableSink requires that Table has
> > >> only insert changes.
> > >>at
> > >>
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > >>at
> > >>
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > >>at
> > >>
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> Code
> > >>
> > >>   val main_column = "`database`, `table`, `transaction_type`,
> > >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> > >> `status`"
> > >>val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> > >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> > >>bsTableEnv.createTemporaryView("main_table", main_table)
> > >>
> > >>val merchant_column = "transaction_sn, user_id"
> > >>val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
> > >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> > >> 'wallet_id_merchant_db%' ")
> > >>bsTableEnv.createTemporaryView("merchant_table", merchant_table)
> > >>
> > >>bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> > >>   | SELECT `database`, `table`,
> > >> `transaction_type`,
> > >>   |   `merchant_id`, `event_time`, `status`,
> > >>   |FIRST_VALUE(`transaction_id`) OVER
> > >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> > >> PRECEDING)
> > >>   | FROM (
> > >>   |SELECT `database`, `table`,
> > >> `transaction_type`, `transaction_id`,
> > >>   |`merchant_id`, `event_time`, `status`,
> > >> `reference_id`
> > >>   |FROM main_table
> > >>   |LEFT JOIN merchant_table
> > >>   |ON main_table.reference_id =
> > >> merchant_table.transaction_sn
> > >>   | )
> > >>   |""".stripMargin)
> > >>
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> >
> >
>


-- 

Best,
Benchao Li


Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 文章 macia kk
感谢 Benchao 和  Leonard 的回复

我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
出去,但是什么情况下会产生 react 消息呢?

Leonard Xu  于2020年5月27日周三 下午3:50写道:

> Hi
> Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> sink无法处理retract消息。
> 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
>
> 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
>
>
> 祝好,
> Leonard Xu
>
>
>
> > 在 2020年5月27日,10:23,Benchao Li  写道:
> >
> > 而且你的SQL里面有一部分是会产生retract的:
> > 这里用的是regular left join,这种join类型是会产生retract结果的。
> >
> >   | FROM (
> >   |SELECT `database`, `table`,
> > `transaction_type`, `transaction_id`,
> >   |`merchant_id`, `event_time`, `status`,
> > `reference_id`
> >   |FROM main_table
> >   |LEFT JOIN merchant_table
> >   |ON main_table.reference_id =
> > merchant_table.transaction_sn
> >   | )
> >
> >
> > macia kk  于2020年5月27日周三 上午1:20写道:
> >
> >> Hi,各位大佬,谁有空帮我看下这个问题
> >>
> >> Source: Kafka
> >> SinkL Kafka
> >>
> >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条
> >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> >>
> >> Error
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: The main
> >> method caused an error: AppendStreamTableSink requires that Table has
> >> only insert changes.
> >>at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >>at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >>at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> Code
> >>
> >>   val main_column = "`database`, `table`, `transaction_type`,
> >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> >> `status`"
> >>val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> >>bsTableEnv.createTemporaryView("main_table", main_table)
> >>
> >>val merchant_column = "transaction_sn, user_id"
> >>val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
> >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> >> 'wallet_id_merchant_db%' ")
> >>bsTableEnv.createTemporaryView("merchant_table", merchant_table)
> >>
> >>bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> >>   | SELECT `database`, `table`,
> >> `transaction_type`,
> >>   |   `merchant_id`, `event_time`, `status`,
> >>   |FIRST_VALUE(`transaction_id`) OVER
> >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> >> PRECEDING)
> >>   | FROM (
> >>   |SELECT `database`, `table`,
> >> `transaction_type`, `transaction_id`,
> >>   |`merchant_id`, `event_time`, `status`,
> >> `reference_id`
> >>   |FROM main_table
> >>   |LEFT JOIN merchant_table
> >>   |ON main_table.reference_id =
> >> merchant_table.transaction_sn
> >>   | )
> >>   |""".stripMargin)
> >>
> >
> >
> > --
> >
> > Best,
> > Benchao Li
>
>


Re:Re: Flink sql 跨库

2020-05-27 文章 Zhou Zach


hive partition table:


1CREATE TABLE `dwd.bill`(
2  `id` bigint, 
3  `gid` bigint, 
4  `count` bigint, 
5  `price` bigint, 
6  `srcuid` bigint, 
7  `srcnickname` string, 
8  `srcleftmoney` bigint, 
9  `srcwealth` bigint, 
10  `srccredit` decimal(10,0), 
11  `dstnickname` string, 
12  `dstuid` bigint, 
13  `familyid` int, 
14  `dstleftmoney` bigint, 
15  `dstwealth` bigint, 
16  `dstcredit` decimal(10,0), 
17  `addtime` bigint, 
18  `type` int, 
19  `getmoney` decimal(10,0), 
20  `os` int, 
21  `bak` string, 
22  `getbonus` decimal(10,0), 
23  `unionbonus` decimal(10,0))
24PARTITIONED BY ( 
25  `year` int, 
26  `month` int)
27ROW FORMAT SERDE 
28  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
29STORED AS INPUTFORMAT 
30  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
31OUTPUTFORMAT 
32  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'




Query:


tableEnv.sqlUpdate(
  """
|
|INSERT INTO catalog2.dwd.orders
|select srcuid, price from catalog2.dwd.bill where year = 2020
|
|
|""".stripMargin)

















在 2020-05-27 18:01:19,"Leonard Xu"  写道:
>Flink 支持hive分区表的,看你在另外一个邮件里贴了,你能把你的hive表和query在邮件里贴下吗?
>
>祝好
>Leonard Xu
>
>> 在 2020年5月27日,17:40,Zhou Zach  写道:
>> 
>> 
>> 
>> 
>> 感谢回复,表名前加上Catalog和db前缀可以成功访问了。
>> 现在遇到个问题,flink 读hive 分区表时,如果where子句用分区键,比如year过滤就会报错,用表中其他字段过滤是没问题的,是flink 
>> 不支持 hive分区表,还是哪个地方没设置对
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-05-27 17:33:11,"Leonard Xu"  写道:
>>> Hi,
 因为一个HiveCatalog只能关联一个库
>>> 一个Catalog是可以关联到多个db的,不同catalog,不同db中表都可以访问的.
>>> 
>>> Flink SQL> show catalogs;
>>> default_catalog
>>> myhive
>>> Flink SQL> use catalog myhive;
>>> Flink SQL> show databases;
>>> default
>>> hive_test
>>> hive_test1
>>> Flink SQL> select * from hive_test.db2_table union select * from 
>>> myhive.hive_test1.db1_table;
>>> 2020-05-27 17:25:48,565 INFO  org.apache.hadoop.hive.conf.HiveConf
>>> 
>>> 
>>> 
>>> 祝好
>>> Leonard Xu
>>> 
>>> 
 在 2020年5月27日,10:55,Zhou Zach  写道:
 
 hi all,
 Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink 
 sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库


Re: Flink sql 跨库

2020-05-27 文章 Leonard Xu
Flink 支持hive分区表的,看你在另外一个邮件里贴了,你能把你的hive表和query在邮件里贴下吗?

祝好
Leonard Xu

> 在 2020年5月27日,17:40,Zhou Zach  写道:
> 
> 
> 
> 
> 感谢回复,表名前加上Catalog和db前缀可以成功访问了。
> 现在遇到个问题,flink 读hive 分区表时,如果where子句用分区键,比如year过滤就会报错,用表中其他字段过滤是没问题的,是flink 不支持 
> hive分区表,还是哪个地方没设置对
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-05-27 17:33:11,"Leonard Xu"  写道:
>> Hi,
>>> 因为一个HiveCatalog只能关联一个库
>> 一个Catalog是可以关联到多个db的,不同catalog,不同db中表都可以访问的.
>> 
>> Flink SQL> show catalogs;
>> default_catalog
>> myhive
>> Flink SQL> use catalog myhive;
>> Flink SQL> show databases;
>> default
>> hive_test
>> hive_test1
>> Flink SQL> select * from hive_test.db2_table union select * from 
>> myhive.hive_test1.db1_table;
>> 2020-05-27 17:25:48,565 INFO  org.apache.hadoop.hive.conf.HiveConf
>> 
>> 
>> 
>> 祝好
>> Leonard Xu
>> 
>> 
>>> 在 2020年5月27日,10:55,Zhou Zach  写道:
>>> 
>>> hi all,
>>> Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink 
>>> sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库



Re: flink-python 配置文件问题

2020-05-27 文章 Xingbo Huang
你好,使用-pyfs指定的python文件会在运行时将python文件加入到PYTHONPATH下面,不会加到classpath下面。

guaishushu1...@163.com  于2020年5月27日周三 下午5:50写道:

> 使用命令-pysf 加载python配置文件  发现classpath并没有这个文件
>
> --
> guaishushu1...@163.com
>


flink-python 配置文件问题

2020-05-27 文章 guaishushu1...@163.com
使用命令-pysf 加载python配置文件  发现classpath并没有这个文件



guaishushu1...@163.com


Re:Re: Flink sql 跨库

2020-05-27 文章 Zhou Zach



感谢回复,表名前加上Catalog和db前缀可以成功访问了。
现在遇到个问题,flink 读hive 分区表时,如果where子句用分区键,比如year过滤就会报错,用表中其他字段过滤是没问题的,是flink 不支持 
hive分区表,还是哪个地方没设置对














在 2020-05-27 17:33:11,"Leonard Xu"  写道:
>Hi,
>> 因为一个HiveCatalog只能关联一个库
>一个Catalog是可以关联到多个db的,不同catalog,不同db中表都可以访问的.
>
>Flink SQL> show catalogs;
>default_catalog
>myhive
>Flink SQL> use catalog myhive;
>Flink SQL> show databases;
>default
>hive_test
>hive_test1
>Flink SQL> select * from hive_test.db2_table union select * from 
>myhive.hive_test1.db1_table;
>2020-05-27 17:25:48,565 INFO  org.apache.hadoop.hive.conf.HiveConf
>
>
>
>祝好
>Leonard Xu
>
>
>> 在 2020年5月27日,10:55,Zhou Zach  写道:
>> 
>> hi all,
>> Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink 
>> sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库


Re: Flink sql 跨库

2020-05-27 文章 Leonard Xu
Hi,
> 因为一个HiveCatalog只能关联一个库
一个Catalog是可以关联到多个db的,不同catalog,不同db中表都可以访问的.

Flink SQL> show catalogs;
default_catalog
myhive
Flink SQL> use catalog myhive;
Flink SQL> show databases;
default
hive_test
hive_test1
Flink SQL> select * from hive_test.db2_table union select * from 
myhive.hive_test1.db1_table;
2020-05-27 17:25:48,565 INFO  org.apache.hadoop.hive.conf.HiveConf



祝好
Leonard Xu


> 在 2020年5月27日,10:55,Zhou Zach  写道:
> 
> hi all,
> Flink sql 的HiveCatalog 是不是不能跨库操作啊,就是一个flink 
> sql中join的两个表涉及到两个不同到库,因为一个HiveCatalog只能关联一个库



Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 文章 Leonard Xu
Hi
Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的 sink无法处理retract消息。
你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,

通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。


祝好,
Leonard Xu



> 在 2020年5月27日,10:23,Benchao Li  写道:
> 
> 而且你的SQL里面有一部分是会产生retract的:
> 这里用的是regular left join,这种join类型是会产生retract结果的。
> 
>   | FROM (
>   |SELECT `database`, `table`,
> `transaction_type`, `transaction_id`,
>   |`merchant_id`, `event_time`, `status`,
> `reference_id`
>   |FROM main_table
>   |LEFT JOIN merchant_table
>   |ON main_table.reference_id =
> merchant_table.transaction_sn
>   | )
> 
> 
> macia kk  于2020年5月27日周三 上午1:20写道:
> 
>> Hi,各位大佬,谁有空帮我看下这个问题
>> 
>> Source: Kafka
>> SinkL Kafka
>> 
>> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条
>> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
>> 
>> Error
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: AppendStreamTableSink requires that Table has
>> only insert changes.
>>at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> Code
>> 
>>   val main_column = "`database`, `table`, `transaction_type`,
>> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
>> `status`"
>>val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
>> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
>>bsTableEnv.createTemporaryView("main_table", main_table)
>> 
>>val merchant_column = "transaction_sn, user_id"
>>val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
>> FROM Keystats_airpay_consumer WHERE `table` LIKE
>> 'wallet_id_merchant_db%' ")
>>bsTableEnv.createTemporaryView("merchant_table", merchant_table)
>> 
>>bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
>>   | SELECT `database`, `table`,
>> `transaction_type`,
>>   |   `merchant_id`, `event_time`, `status`,
>>   |FIRST_VALUE(`transaction_id`) OVER
>> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
>> PRECEDING)
>>   | FROM (
>>   |SELECT `database`, `table`,
>> `transaction_type`, `transaction_id`,
>>   |`merchant_id`, `event_time`, `status`,
>> `reference_id`
>>   |FROM main_table
>>   |LEFT JOIN merchant_table
>>   |ON main_table.reference_id =
>> merchant_table.transaction_sn
>>   | )
>>   |""".stripMargin)
>> 
> 
> 
> -- 
> 
> Best,
> Benchao Li



Re: 回复:全局state

2020-05-27 文章 a773807...@gmail.com
不会,在第二次keyby的时候,根据id-name 做key, 然后做reduce, 
把重复的数据中的value,根据time去做判断,取time最小值的value,就可以去重了



a773807...@gmail.com
 
发件人: star
发送时间: 2020-05-27 15:45
收件人: user-zh
主题: 回复:回复:全局state
感谢您的建议,但是这样数据会重复,翻倍。目前看好像只能依赖外部存储了
 
 
 
发自我的iPhone
 
 
-- 原始邮件 --
发件人: a773807...@gmail.com https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#define-keys-using-key-selector-functions
 
Best,
tison.
 
 
star <3149768...@qq.com> 于2020年5月26日周二 下午6:42写道:
 
> 
请问,有全局状态组件吗?我有一个需求需要对数据里的id和name做映射,也就是如果两条数据的id或者name相同则映射成一个值;现在只能使用operator
> state,并且并行度设置为1,来实现全局state
>
>
> 谢谢
>
> 发自我的iPhone


回复:回复:全局state

2020-05-27 文章 star
感谢您的建议,但是这样数据会重复,翻倍。目前看好像只能依赖外部存储了



发自我的iPhone


-- 原始邮件 --
发件人: a773807...@gmail.com https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/api_concepts.html#define-keys-using-key-selector-functions
 
Best,
tison.
 
 
star <3149768...@qq.com> 于2020年5月26日周二 下午6:42写道:
 
> 
请问,有全局状态组件吗?我有一个需求需要对数据里的id和name做映射,也就是如果两条数据的id或者name相同则映射成一个值;现在只能使用operator
> state,并且并行度设置为1,来实现全局state
>
>
> 谢谢
>
> 发自我的iPhone