Re: flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询

2020-03-04 Thread JingsongLee
Hi,

你的需求是什么?下列哪种?
- 1.想用unbounded source,continuous的file source,监控文件夹,发送新文件,且需要支持多文件夹
- 2.只是想用bounded的input format,需要支持多文件

如果是1,现在仍然不支持。
如果是2,那你可以用env.addSource(new InputFormatSourceFunction(..)...)来支持多文件。

Best,
Jingsong Lee


--
From:王智 
Send Time:2020年3月4日(星期三) 17:34
To:user-zh 
Subject:flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 
不兼容问题咨询

我在使用flink 1.8 自定义 FileInputFormat 
的时候遇到了不兼容问题,初步看了源代码,不知道自己得出的结论是否正确,并想了解一下后续趋势和进展,麻烦各位大佬抽空指点一下,先行感谢~~




问题1: StreamExecutionEnvironment 为什么要做这样的限制?ContinuousFileMonitoringFunction 
的作用是什么? 

相关的代码描述如下




StreamExecutionEnvironment 对 FileInputFormat 对象有特殊的处理逻辑
if (inputFormat instanceof FileInputFormat) {
   @SuppressWarnings("unchecked")
   FileInputFormat

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-03 Thread JingsongLee
Hi jun,

Jira: https://issues.apache.org/jira/browse/FLINK-16413
FYI

Best,
Jingsong Lee


--
From:JingsongLee 
Send Time:2020年3月3日(星期二) 19:06
To:Jun Zhang <825875...@qq.com>; user-zh@flink.apache.org 

Cc:user-zh@flink.apache.org ; like 
Subject:Re: 使用Flink1.10.0读取hive时source并行度问题

Hi jun,

很好的建议~ 这是一个优化点~ 可以建一个JIRA

Best,
Jingsong Lee


--
From:Jun Zhang <825875...@qq.com>
Send Time:2020年3月3日(星期二) 18:45
To:user-zh@flink.apache.org ; JingsongLee 

Cc:user-zh@flink.apache.org ; like 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题



hi,jinsong:
 我想说一个问题, 我开始了自动推断,比如我设置推断的最大并行度是10,
我有一个类似的sql   select * from  mytable limit 1;
hive表mytable有超过10个文件,如果启动了10个并行度是不是有点浪费呢。
在2020年03月2日 16:38,JingsongLee 写道:
建议使用Batch模式来读取Hive table。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 16:35
To:lzljs3620...@aliyun.com 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


我使用的是 StreamTableEnvironment,确实有碰到这个问题呢。
在2020年3月2日 16:16,JingsongLee 写道:   
 自动推断可能面临资源不足无法启动的问题

理论上不应该呀?Batch作业是可以部分运行的。

Best,
Jingsong Lee

--
From:like 
Send Time:2020年3月2日(星期一) 15:35
To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com 

Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 


在2020年3月2日 15:18,JingsongLee 写道:   Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?  

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-03 Thread JingsongLee
Hi jun,

很好的建议~ 这是一个优化点~ 可以建一个JIRA

Best,
Jingsong Lee


--
From:Jun Zhang <825875...@qq.com>
Send Time:2020年3月3日(星期二) 18:45
To:user-zh@flink.apache.org ; JingsongLee 

Cc:user-zh@flink.apache.org ; like 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


 
hi,jinsong:
 我想说一个问题, 我开始了自动推断,比如我设置推断的最大并行度是10,
我有一个类似的sql   select * from  mytable limit 1;
hive表mytable有超过10个文件,如果启动了10个并行度是不是有点浪费呢。
在2020年03月2日 16:38,JingsongLee 写道:
建议使用Batch模式来读取Hive table。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 16:35
To:lzljs3620...@aliyun.com 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


我使用的是 StreamTableEnvironment,确实有碰到这个问题呢。
在2020年3月2日 16:16,JingsongLee 写道:   
 自动推断可能面临资源不足无法启动的问题

理论上不应该呀?Batch作业是可以部分运行的。

Best,
Jingsong Lee

--
From:like 
Send Time:2020年3月2日(星期一) 15:35
To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com 

Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 


在2020年3月2日 15:18,JingsongLee 写道:   Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?  

Re: SHOW CREATE TABLE in Flink SQL

2020-03-02 Thread JingsongLee
Hi,

Some previous discussion in [1], FYI

[1] https://issues.apache.org/jira/browse/FLINK-10230

Best,
Jingsong Lee


--
From:Jark Wu 
Send Time:2020年3月2日(星期一) 22:42
To:Jeff Zhang 
Cc:"Gyula Fóra" ; user 
Subject:Re: SHOW CREATE TABLE in Flink SQL

big +1 for this. I created an issue for "SHOW CREATE TABLE" [1]. Many database 
systems also support this.
We can also introduce "describe extended table" in the future but is an 
orthogonal requirement. 

Best,
Jark


[1]: https://issues.apache.org/jira/browse/FLINK-16384
On Mon, 2 Mar 2020 at 22:09, Jeff Zhang  wrote:

+1 for this, maybe we can add 'describe extended table' like hive
Gyula Fóra  于2020年3月2日周一 下午8:49写道:
Hi All!

I am looking for the functionality to show how a table was created or show all 
the properties (connector, etc.)

I could only find DESCRIBE at this point which only shows the schema.

Is there anything similar to "SHOW CREATE TABLE" or is this something that we 
should maybe add in the future?

Thank you!
Gyula 

-- 
Best Regards

Jeff Zhang  

Re: java.time.LocalDateTime in POJO type

2020-03-02 Thread JingsongLee
Hi,

I'v introduced LocalDateTime type information to flink-core.
But for compatibility reason, I revert the modification in TypeExtractor.
It seems that at present you can only use Types.LOCAL_DATE_TIME explicitly.

[1] http://jira.apache.org/jira/browse/FLINK-12850

Best,
Jingsong Lee


--
From:KristoffSC 
Send Time:2020年3月3日(星期二) 03:47
To:user 
Subject:Re: java.time.LocalDateTime in POJO type

Hi Tzu-Li,
I think you misunderstood Oskar's question. 
The question was if there are there any plans to support Java's
LocalDateTime in Flink's "native" de/serialization mechanism. As we can read
in [1], for basic types, Flink supports all Java primitives and their boxed
form, plus void, String, Date, BigDecimal, and BigInteger.

So we have Java Date, the question is, will there be a support for
LocalDateTime? 

Thanks,
Krzysztof

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#flinks-typeinformation-class



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: 开发相关问题咨询Development related problems consultation

2020-03-02 Thread JingsongLee
Hi, welcome,

For user side,

u...@flink.apache.org is for English.
user-zh@flink.apache.org is for Chinese.

d...@flink.apache.org is for development related discussions, so please not 
send to it.

Best,
Jingsong Lee


--
From:王博迪 
Send Time:2020年3月2日(星期一) 17:08
To:user-zh ; dev 
Subject:开发相关问题咨询Development related problems consultation

您好,
   我是你们flink的新用户,有一些开发相关的问题想咨询,问一下可以和哪个邮箱交流。
谢谢
Hello, I am a new user of flink. I would like to ask you some questions related 
to development. I would like to know which email can I communicate with

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-02 Thread JingsongLee
建议使用Batch模式来读取Hive table。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 16:35
To:lzljs3620...@aliyun.com 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题

  
我使用的是 StreamTableEnvironment,确实有碰到这个问题呢。
在2020年3月2日 16:16,JingsongLee 写道:   
> 自动推断可能面临资源不足无法启动的问题

理论上不应该呀?Batch作业是可以部分运行的。

Best,
Jingsong Lee

--
From:like 
Send Time:2020年3月2日(星期一) 15:35
To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com 

Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 
 

在2020年3月2日 15:18,JingsongLee 写道:   Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-02 Thread JingsongLee
> 自动推断可能面临资源不足无法启动的问题

理论上不应该呀?Batch作业是可以部分运行的。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 15:35
To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com 

Subject:回复: 使用Flink1.10.0读取hive时source并行度问题

  
非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 
 

在2020年3月2日 15:18,JingsongLee 写道:Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?  

Re: Question about runtime filter

2020-03-01 Thread JingsongLee
Hi,

Does runtime filter probe side wait for building runtime filter?
Can you check the start time of build side and probe side? 

Best,
Jingsong Lee


--
From:faaron zheng 
Send Time:2020年3月2日(星期一) 14:55
To:user 
Subject:Question about runtime filter

Hi, everyone

These days, I am trying to implement runtime filter in flink1.10 with 
flink-sql-benchmark  according to blink. I mainly change three part of flink 
code: add runtime filter rule; modify the code gen and bloomfilter; add some 
aggregatedaccumulator  methods according to accumulator. Now, It seems runtime 
filter works in execution graph as follows:
Source: HiveTableSource(i_item_sk, i_item_id, i_rec_start_date, i_rec_end_date, 
i_item_desc, i_current_price, i_wholesale_cost, i_brand_id, i_brand, 
i_class_id, i_class, i_category_id, i_category, i_manufact_id, i_manufact, 
i_size, i_formulation, i_color, i_units, i_container, i_manager_id, 
i_product_name) TablePath: tpcds_bin_orc_2.item, PartitionPruned: false, 
PartitionNums: null, ProjectedFields: [0, 10, 12] -> Calc(select=[i_item_sk], 
where=[((i_category = _UTF-16LE'Jewelry':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE") AND (i_class = _UTF-16LE'consignment':VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE") AND RUNTIME_FILTER_BUILDER_0(i_item_sk))])  

and

Source: HiveTableSource(d_date_sk, d_date_id, d_date, d_month_seq, d_week_seq, 
d_quarter_seq, d_year, d_dow, d_moy, d_dom, d_qoy, d_fy_year, d_fy_quarter_seq, 
d_fy_week_seq, d_day_name, d_quarter_name, d_holiday, d_weekend, 
d_following_holiday, d_first_dom, d_last_dom, d_same_day_ly, d_same_day_lq, 
d_current_day, d_current_week, d_current_month, d_current_quarter, 
d_current_year) TablePath: tpcds_bin_orc_2.date_dim, PartitionPruned: false, 
PartitionNums: null, ProjectedFields: [0, 3] -> Calc(select=[d_date_sk, 
d_month_seq], where=[RUNTIME_FILTER_2(d_date_sk)])  


However,the number of records sent is the same as normal.  Anyone who can give 
me some advices?



Thanks 

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-01 Thread JingsongLee
Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

  我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?

Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

2020-01-15 Thread JingsongLee
+user-zh


--
From:JingsongLee 
Send Time:2020年1月15日(星期三) 16:05
To:Others <41486...@qq.com>
Subject:Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

是的。
另一个方法是使用[1]的classpath,添加多个jars。

BTW, 回复邮件时请带上user-zh。

Best,
Jingsong Lee

[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#usage


--
From:Others <41486...@qq.com>
Send Time:2020年1月15日(星期三) 15:54
To:user-zh@flink.apache.org JingsongLee 
Subject:回复: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

Hi,
我的集群 是Standalone 方式部署的 是加在 Flink Master机器下么 还是每一台都要加? 加完之后是否需要重启集群?


-- 原始邮件 --
发件人: "JingsongLee";
发送时间: 2020年1月15日(星期三) 下午3:46
收件人: "Others"<41486...@qq.com>;"user-zh";
主题:  Re: Re: 求助帖:flink 连接kafka source 部署集群报错

Hi,

我怀疑的你这样打包会导致meta-inf.services的文件相互覆盖。
你试试把flink-json和flink-kafka的jar直接放入flink/lib下

Best,
Jingsong Lee


--
From:Others <41486...@qq.com>
Send Time:2020年1月15日(星期三) 15:27
To:user-zh@flink.apache.org JingsongLee 
Subject:回复: Re: 求助帖:flink 连接kafka source 部署集群报错


集群环境下应该放在哪个lib下?

一下是打包过程的log
[INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flinkjob ---
[INFO] Including org.apache.flink:flink-core:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-annotations:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-metrics-core:jar:1.9.1 in the shaded 
jar.
[INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar.
[INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar.
[INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar.
[INFO] Including commons-collections:commons-collections:jar:3.2.2 in the 
shaded jar.
[INFO] Including org.apache.commons:commons-compress:jar:1.18 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-asm-6:jar:6.2.1-7.0 in the 
shaded jar.
[INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
[INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar.
[INFO] Excluding org.apache.flink:force-shading:jar:1.9.1 from the shaded jar.
[INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar.
[INFO] Including org.scala-lang:scala-library:jar:2.11.12 in the shaded jar.
[INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-7.0 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-table-api-java-bridge_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-java:jar:1.9.1 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-table-planner-blink_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.scala-lang:scala-reflect:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang:scala-compiler:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang.modules:scala-xml_2.11:jar:1.0.5 in the shaded 
jar.
[INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 
in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala-bridge_2.11:jar:1.9.1 
in the shaded jar.
[INFO] Including org.apache.flink:flink-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.apache.flink:flink-table-runtime-blink_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.codehaus.janino:janino:jar:3.0.9 in the shaded jar.
[INFO] Including org.codehaus.janino:commons-compiler:jar:3.0.9 in the shaded 
jar.
[INFO] Including org.apache.calcite.avatica:avatica-core:jar:1.15.0 in the 
shaded jar.
[INFO] Including org.reflections:reflections:jar:0.9.10 in the shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.kafka:kafka-clients:jar:2.2.0 in the shaded jar.
[INFO] Including com.github.luben:zstd-jni:jar:1.3.8-1 in the shaded jar.
[INFO] Including org.lz4:lz4-java:jar:1.5.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-common:jar:1.9.1 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-jdbc_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including mysql:mysql-connector-java:jar:5.1.48 in the shaded jar.
[INFO] Including com.google.code.gson:gson:jar:2.8.5 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
[INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
[WARNING] janino-3.0.9.ja

Re: Re: 求助帖:flink 连接kafka source 部署集群报错

2020-01-14 Thread JingsongLee
Hi,

我怀疑的你这样打包会导致meta-inf.services的文件相互覆盖。
你试试把flink-json和flink-kafka的jar直接放入flink/lib下

Best,
Jingsong Lee


--
From:Others <41486...@qq.com>
Send Time:2020年1月15日(星期三) 15:27
To:user-zh@flink.apache.org JingsongLee 
Subject:回复: Re: 求助帖:flink 连接kafka source 部署集群报错


集群环境下应该放在哪个lib下?

一下是打包过程的log
[INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flinkjob ---
[INFO] Including org.apache.flink:flink-core:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-annotations:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-metrics-core:jar:1.9.1 in the shaded 
jar.
[INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar.
[INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar.
[INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar.
[INFO] Including commons-collections:commons-collections:jar:3.2.2 in the 
shaded jar.
[INFO] Including org.apache.commons:commons-compress:jar:1.18 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-asm-6:jar:6.2.1-7.0 in the 
shaded jar.
[INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
[INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar.
[INFO] Excluding org.apache.flink:force-shading:jar:1.9.1 from the shaded jar.
[INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar.
[INFO] Including org.scala-lang:scala-library:jar:2.11.12 in the shaded jar.
[INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-7.0 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-table-api-java-bridge_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-java:jar:1.9.1 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-table-planner-blink_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.scala-lang:scala-reflect:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang:scala-compiler:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang.modules:scala-xml_2.11:jar:1.0.5 in the shaded 
jar.
[INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 
in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala-bridge_2.11:jar:1.9.1 
in the shaded jar.
[INFO] Including org.apache.flink:flink-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.apache.flink:flink-table-runtime-blink_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.codehaus.janino:janino:jar:3.0.9 in the shaded jar.
[INFO] Including org.codehaus.janino:commons-compiler:jar:3.0.9 in the shaded 
jar.
[INFO] Including org.apache.calcite.avatica:avatica-core:jar:1.15.0 in the 
shaded jar.
[INFO] Including org.reflections:reflections:jar:0.9.10 in the shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.9.1 in the 
shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.9.1 in 
the shaded jar.
[INFO] Including org.apache.kafka:kafka-clients:jar:2.2.0 in the shaded jar.
[INFO] Including com.github.luben:zstd-jni:jar:1.3.8-1 in the shaded jar.
[INFO] Including org.lz4:lz4-java:jar:1.5.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-common:jar:1.9.1 in the shaded 
jar.
[INFO] Including org.apache.flink:flink-jdbc_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including mysql:mysql-connector-java:jar:5.1.48 in the shaded jar.
[INFO] Including com.google.code.gson:gson:jar:2.8.5 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
[INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
[WARNING] janino-3.0.9.jar, flink-table-planner-blink_2.11-1.9.1.jar define 440 
overlapping classes: 
[WARNING]   - org.codehaus.janino.util.resource.ResourceCreator
[WARNING]   - org.codehaus.janino.ReflectionIClass$ReflectionIField
[WARNING]   - org.codehaus.janino.IClass$1
[WARNING]   - org.codehaus.janino.UnitCompiler$35
[WARNING]   - 
org.codehaus.janino.Java$CompilationUnit$SingleStaticImportDeclaration
[WARNING]   - org.codehaus.janino.Java$PackageMemberEnumDeclaration
[WARNING]   - org.codehaus.janino.UnitCompiler$13$1
[WARNING]   - org.codehaus.janino.Unparser
[WARNING]   - org.codehaus.janino.CodeContext$Branch
[WARNING]   - org.codehaus.janino.UnitCompiler$33$2
[WARNING]   - 430 more...
[WARNING] avatica-core-1.15.0.jar, flink-table-planner-blink_2.11-1.9.1.jar 
define 605 overlapping classes: 
[WARNING]   - org.apache.calcite.avatica.AvaticaParameter
[WARNING]   - org.apache.calcite.avatic

Re: 求助帖:flink 连接kafka source 部署集群报错

2020-01-14 Thread JingsongLee
Hi,

你是不是没有把Json的jar包放入lib下?看起来你的User jar也没用jar-with-dependencies,所以也不会包含json的jar。

Best,
Jingsong Lee


--
From:Others <41486...@qq.com>
Send Time:2020年1月15日(星期三) 15:03
To:user-zh 
Subject:求助帖:flink 连接kafka source 部署集群报错

我使用的flink 版本 是1.9.1
本地调试正常。部署集群启动时报一下错误
2020-01-15 11:57:44,255 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: findAndCreateTableSource failed.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource 
failed.
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
at 
com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 
'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
at 

Re: 求助帖: 流join场景可能出现的重复计算

2020-01-14 Thread JingsongLee
Hi ren,

Blink的deduplication功能应该是能match你的需求。[1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication

Best,
Jingsong Lee


--
From:Caizhi Weng 
Send Time:2020年1月15日(星期三) 11:53
To:user-zh 
Subject:Re: 求助帖: 流join场景可能出现的重复计算

Hi,

Flink 目前认为所有的 source 都是 append only 的,retract、upsert 等都是内部处理时的概念,对用户是不可见的。

所以目前你只能先通过 group by 和 last_value 等方式实现功能。不过 1.11 有计划支持这样的需求。

Ren Xie  于2020年1月14日周二 下午9:30写道:

> 谢谢
>
> 考虑过group by , 实际中 一个好多字段的表, 保不准就是那个字段发生了变化.
>
> 请问 类似的双流操作在开发中常见吗, 怎么都搜不到相似的使用, 按理谁流依赖另一个流做处理 应该算常见吧
>
> 还是说我这样的需求呀 实现呀 是野路子?
>
> Yuan,Youjun  于2020年1月14日周二 下午8:22写道:
>
> > 取决于具体的场景。想到的有如下几种方案:
> > 1,group by student_id和student_name,而不是只group by
> > student_id。当然前提是修改同名名字不会推送一条消息到流1.
> > 2,过滤掉update的消息
> > 3,基于时间窗口的聚合,对于student表的数据,每n秒输出一个唯一的student_id,然后再与score流join。
> >
> > -邮件原件-
> > 发件人: xin Destiny 
> > 发送时间: Tuesday, January 14, 2020 6:39 PM
> > 收件人: user-zh@flink.apache.org
> > 主题: Re: 求助帖: 流join场景可能出现的重复计算
> >
> > Hi,
> > 如果说插入两条update操作呢,一次分数是-97,一次是97
> >
> >
> >
> >
> > Ren Xie  于2020年1月14日周二 下午6:20写道:
> >
> > > 实际场景还是有点复杂的, 便于理解 我简化成这样的,  简化后的这个, 没有实际的代码, 抱歉
> > >
> > > 大致 写一下 也就是这样了
> > > ```sql
> > > select sum(score)
> > > from
> > > student t1 inner join score t2 on t1.student_id = t2.std_id where
> > > t1.student_id = 11
> > > ```
> > > 然后
> > >
> > > ```Java
> > > String sql = ↑;
> > > Table t = tEnv.sqlQuery(sql);
> > > DataStream stream1 = tEnv.toAppendStream(t, Integer.class);
> > > stream1.keyBy("").sum("");
> > > ```
> > >
> > > 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98
> > >
> > > update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98
> > >
> > > 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次
> > >
> > >
> > > Caizhi Weng  于2020年1月14日周二 下午5:49写道:
> > >
> > > > Hi,
> > > >
> > > > 有可能的话,是否方便提供一下代码呢?
> > > >
> > > > Ren Xie  于2020年1月14日周二 下午5:38写道:
> > > >
> > > > > 学生
> > > > > student_id name
> > > > > 11 foo
> > > > >
> > > > > 学科分数
> > > > > id name score std_id
> > > > > 100 math 97 11
> > > > > 101 english 98 11
> > > > >
> > > > > 有如下一个场景(假设只有一个学生)
> > > > >
> > > > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> > > > >
> > > > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> > > > >
> > > > > 但此时发现学生姓名登记错误, 于是进行了修改,
> > > > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 *
> > > > > (97
> > > +
> > > > > 98) = 390
> > > > >
> > > > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> > > > >
> > > > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> > > > >
> > > >
> > >
> >
>


Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-13 Thread JingsongLee
谢谢,
你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。

Best,
Jingsong Lee


--
From:Kevin Liao 
Send Time:2020年1月14日(星期二) 11:38
To:user-zh ; JingsongLee 
Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错

flink 版本是 1.9.1 release

Doc 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约 
30 多个字段,我理解这跟字段数关系不大

```
import org.apache.commons.lang3.builder.ToStringBuilder;
import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;

/**
 * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
 */
@JsonIgnoreProperties(ignoreUnknown = true)
public class Doc {

  private String suv;
  private Float factor = 1F;
  private String st;
  private String agentId;
  private Long timestamp;
  ... // omit some, omit getters and setters
```

希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
JingsongLee  于2020年1月14日周二 上午11:25写道:
Hi Kevin,

 这是什么版本?
 Doc类能完整提供下吗?方便我们复现。

 Best,
 Jingsong Lee


 --
 From:Kevin Liao 
 Send Time:2020年1月13日(星期一) 17:37
 To:user-zh 
 Subject:blink planner的org.apache.flink.table.api.ValidationException报错

 tEnv.connect(new Kafka()
 .version("universal")
 .topic("xxx")
 .startFromLatest()
 .property("bootstrap.servers",
 "")
 .property("group.id", ""))
 .withFormat(new Json().failOnMissingField(false).deriveSchema())
 .withSchema(new Schema()
 //.field("logger_name", Types.STRING)
 //.field("host", Types.STRING)
 //.field("@timestamp", Types.SQL_TIMESTAMP)
 //.field("_rowtime", Types.SQL_TIMESTAMP)
 //.rowtime(
 //new
 Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
 .field("doc", Types.POJO(Doc.class))
 )
 .inAppendMode()
 .registerTableSource("xxx");

 Table result = tEnv.sqlQuery(
 "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");

 //result.printSchema();
 tEnv.toAppendStream(result,
 new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
 STRING, STRING, STRING,
 STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
 STRING, STRING, STRING,
 STRING, LONG, STRING, INT, STRING, INT)).print();



 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:


 、、、

 Exception in thread "main"
 org.apache.flink.table.api.ValidationException: Type
 LEGACY(PojoType) of
 table field 'doc' does not match with type
 PojoType of the field
 'doc' of the TableSource return type.
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at 
org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
  at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
  at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
  at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
  at 
org.apache.flink.table.planner.plan.nodes.physica

Re: 注册table时catalog无法变更

2020-01-07 Thread JingsongLee
Hi xiyueha,

你可以用TableEnv.sqlUpdate("create table ...")的DDL的方式,这会注册到当前catalog中。

Best,
Jingsong Lee


--
From:Kurt Young 
Send Time:2020年1月8日(星期三) 09:17
To:user-zh 
Cc:xiyueha 
Subject:Re: 注册table时catalog无法变更

临时表的话只能放在指定的catalog中,不建议将临时表注册到另一个catalog,比如hive catalog。
临时表大部分情况下是不能序列化的,那样的话代码会报错。

Best,
Kurt


On Tue, Jan 7, 2020 at 9:20 PM 贺小令  wrote:

> hi,
>
> streamTableEnvironment.registerDataStream(tableName, dataStream, fields);
> 注册的表都是Temporary Table。
>
> 你可以通过:
> catalog = new InMemoryExternalCatalog(catalogName);
> streamTableEnvironment.registerCatalog(catalogName, catalog);
> catalog.createTable()
>
> 或者
> streamTableEnvironment.getCatalog().get().createTable()
>
> 的方式来注册表到指定的catalog
>
>
> xiyu...@163.com  于2020年1月7日周二 下午3:20写道:
>
> > hi,各位:
> >
> >
> 我在开发过程中,通过下面方式注册table时,默认使用的catalog是EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
> > streamTableEnvironment.registerDataStream(tableName, dataStream,
> >
> fields);尝试通过下面方式解决,但是注册的table仍然在EnvironmentSettings.DEFAULT_BUILTIN_CATALOG中
> >   streamTableEnvironment.registerCatalog(catalogName, new
> > InMemoryExternalCatalog(catalogName));
> > streamTableEnvironment.useCatalog(catalogName);请问,我如何将table注册到指定的catalog?
> >
> >
> > xiyu...@163.com
> >
>


Re: FLINK 1.9.1 StreamingFileSink 压缩问题

2020-01-01 Thread JingsongLee
Hi,

看起来你只能改下connector代码才能支持压缩了:
ParquetAvroWriters.createAvroParquetWriter里:设置AvroParquetWriter.Builder的压缩格式。

Best,
Jingsong Lee


--
From:USERNAME 
Send Time:2020年1月2日(星期四) 13:36
To:user-zh 
Subject:FLINK 1.9.1 StreamingFileSink 压缩问题

各位好,FLINK 1.9.1 使用 StreamingFileSink 写Parquet到HDFS,能启用压缩吗?

--代码
StreamingFileSink sink = StreamingFileSink
.forBulkFormat(new Path(FILE_HDFS_PATH), 
ParquetAvroWriters.forReflectRecord(HDFSBean.class))
.withBucketAssigner(new DateTimeBucketAssigner<>(FILE_HDFS_FORMAT))

.build();



Re: How should i set the field type in mysql when i use temporal table join between kafka and jdbc ?

2020-01-01 Thread JingsongLee
Hi,

user-zh我就说中文啦.
你需要设置成bigint.
具体报什么错?

Best,
Jingsong Lee


--
From:刘世民 
Send Time:2020年1月2日(星期四) 13:47
To:user-zh 
Subject:How should i set the field type in mysql when i use temporal table join 
between kafka and jdbc ?


for example, num_count field type is Long, but no matter if i set it to bigint 
or something else in mysql table, it has always report errorr...
so,what can i should set num_count field type in mysql? thanks

Best !
amenhub



Re: StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema

2019-12-31 Thread JingsongLee
Hi aven,

这是个合理的需求。
现在的问题是:
- Flink table只支持Row, Pojo, Tuple, CaseClass作为结构化的数据类型。
- 
而你的类型是JSONObject,它其实也是一个结构化的数据类型,但是目前Flink不支持它,所以可以考虑有这样的DeserializationSchema机制来支持它。

但是我理解其实没有差别多少,比如你提供RowDeserializationSchema,其实就是JSONObject到Row的转换,那你完全可以把这个套在DataStream.map中,把它转换成Flink
 table支持的结构化类型。

Best,
Jingsong Lee


--
From:aven.wu 
Send Time:2019年12月31日(星期二) 14:09
To:user-zh@flink.apache.org 
Subject:回复: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你好!
“把 JSONObject类型定义成object类型” 可以解决在确定字段和类型的情况下并且需要编码到程序中。
如果能开放这部分的能力,可以不通过编码(新增POJO)的方式来完成一个Datastream 到 stream 的table注册。

best wish
发送自 Windows 10 版邮件应用

发件人: Terry Wang
发送时间: 2019年12月30日 12:37
收件人: user-zh@flink.apache.org
主题: Re: StreamTableEnvironment.registerDatastream() 
开放用户自定义的schemaDescriptionh和DeserializationSchema

你这种需求的一种解决思路,可以把 
JSONObject类型定义成object类型,然后注册成table之后通过一个UDTF把JSONObject转换成特定的schema。

Best,
Terry Wang



> 2019年12月27日 19:56,aven.wu  写道:
> 
> StreamTableEnvironment.registerDatastream(),目前只支持pojo 或者 
> 是public属性的对象,根据默认字段或者用户指定的字段注册table,但某些场景下没有固定的格式,比如使用JSONObject类型DataStream就无法通过这种方法注册成table,是否可以提供更底层的API来使table注册的灵活性更高。用户传入schema的描述和自定义的解析器DeserializationSchema.
> 
> 
> 发送自 Windows 10 版邮件应用
> 




Re: Flink1.9批任务yn和ys对任务的影响

2019-12-26 Thread JingsongLee
SQL的Batch作业是会关闭slotsharing的。

Best,
Jingsong Lee


--
From:faaron zheng 
Send Time:2019年12月26日(星期四) 17:23
To:user-zh@flink.apache.org 
Subject:回复:Flink1.9批任务yn和ys对任务的影响

了解了,感谢三位。我的slot上包括一个hash-join一个hash-agg,加起来刚好256mb。不过因为存在slotsharing的原因,感觉并不容易提前判断。
 faaron zheng 邮箱:faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 
15:09,JingsongLee 写道: Hi faaron zheng, 如kurt所说,强烈建议使用1.10,现在已拉分支。 
TM运行的一个经验值是:TM有10个Slot,TM内存10G:JVM堆内4G、1G网络buffer、manage内存5G(也就是说单个slot的manage内存500M)。
 Best, Jingsong Lee 
-- From:Kurt 
Young  Send Time:2019年12月26日(星期四) 14:07 To:user-zh 
 Subject:Re: Flink1.9批任务yn和ys对任务的影响 
也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量, 而是根据当时 slot 能提供多少 managed 
内存来自适应了。 Best, Kurt On Thu, Dec 26, 2019 at 1:36 PM Xintong Song 
 wrote: > slot需要多少内存是和具体作业相关的,不同作业差别会比较大。 > > 
slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。 > 
算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。 > > 
如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with > 
profile"就能够看到slot的资源需求。 > > Thank you~ > > Xintong Song > > > [1] > > 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options
 > > On Thu, Dec 26, 2019 at 11:36 AM faaron zheng  > 
wrote: > > > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > > 
memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > > 
但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > > 
faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道: > > 
感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed > > 
memory为2g,也就是一个slot平均200m,所以任务没调度起来。 > > 
但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱: > > 
faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道: > > Hi 
faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM > > 
的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink > > 
1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot > 的managed > 
> memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09 > > 
AM faaron zheng  wrote: > 跑tpcds的query1: flink > run > > 
-m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink > > 
run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g > > 
任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to > > > 
ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: > > > 
faaronzh...@gmail.com 签名由 网易邮箱大师 定制 >

Re: Flink1.9批任务yn和ys对任务的影响

2019-12-25 Thread JingsongLee
Hi faaron zheng,

如kurt所说,强烈建议使用1.10,现在已拉分支。

TM运行的一个经验值是:TM有10个Slot,TM内存10G:JVM堆内4G、1G网络buffer、manage内存5G(也就是说单个slot的manage内存500M)。

Best,
Jingsong Lee


--
From:Kurt Young 
Send Time:2019年12月26日(星期四) 14:07
To:user-zh 
Subject:Re: Flink1.9批任务yn和ys对任务的影响

也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量,
而是根据当时 slot 能提供多少 managed 内存来自适应了。

Best,
Kurt


On Thu, Dec 26, 2019 at 1:36 PM Xintong Song  wrote:

> slot需要多少内存是和具体作业相关的,不同作业差别会比较大。
>
> slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。
> 算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。
>
> 如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with
> profile"就能够看到slot的资源需求。
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options
>
> On Thu, Dec 26, 2019 at 11:36 AM faaron zheng 
> wrote:
>
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道:
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道:
> > Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM
> > 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink
> > 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot
> 的managed
> > memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09
> > AM faaron zheng  wrote: > 跑tpcds的query1: flink
> run
> > -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink
> > run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g
> > 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to >
> > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: >
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制
>


Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-10 Thread JingsongLee
实时性取决于checkpoint时间间隔。
Flink这边的sink没有合并小文件的功能。

Best,
Jingsong Lee


--
From:陈帅 
Send Time:2019年12月10日(星期二) 21:45
To:JingsongLee 
Subject:Re: Flink实时数仓落Hive一般用哪种方式好?

我想要的streaming写就是数据实时写入HDFS文件,场景有实时数据仓库等。需要平衡实时性以及小文件过多的问题。目前处理小文件问题的方法都是在事后合并文件吗?
JingsongLee  于2019年12月10日周二 上午10:48写道:

Hi 陈帅,

1.BulkWriter.Factory接口不适合ORC, 正如yue ma所说,你需要一些改动
2.StreamingFileSink整个机制都是基于做checkpoint才会真正move文件的,不知道你所想的streaming写是什么,以及对你的业务场景有什么要求吗?

Best,
Jingsong Lee

--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:21
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink实时数仓落Hive一般用哪种方式好?

1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢?
2. BulkWriter是不是攒微批写文件的?
JingsongLee  于2019年12月9日周一 下午3:24写道:
Hi 帅,
 - 目前可以通过改写StreamingFileSink的方式来支持Parquet。
 (但是目前StreamingFileSink支持ORC比较难)
 - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
 - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。

 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]

 [1] https://issues.apache.org/jira/browse/FLINK-14249

 Best,
 Jingsong Lee


 --
 From:陈帅 
 Send Time:2019年12月8日(星期日) 10:04
To:user-zh@flink.apache.org 
 Subject:Flink实时数仓落Hive一般用哪种方式好?

 有人说直接写到HBase,再在Hive关联Hbase表
 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:

 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
 写的话,目前来看没有现成的Streaming
 Writer,官方提供的都是
 BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
 业务上的Update和Delete操作 数据一般是如何sync进Hive的?

 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?


Re: Flink RetractStream如何转成AppendStream?

2019-12-10 Thread JingsongLee
目前不能由SQL直接转。

Best,
Jingsong Lee


--
From:陈帅 
Send Time:2019年12月10日(星期二) 21:48
To:JingsongLee 
Subject:Re: Flink RetractStream如何转成AppendStream?

代码api的方式我知道怎么转,想知道用sql的方式要如何转?需要先写到一张临时表再sink到目标表?有例子吗?
JingsongLee  于2019年12月10日周二 上午10:49写道:

 参考下lucas.wu的例子?

Best,
Jingsong Lee

--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:25
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink RetractStream如何转成AppendStream?

"你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。"
==>我想知道通过Flink SQL方式要如何实现这种转换?
JingsongLee  于2019年12月9日周一 下午3:17写道:
Hi 帅,

 你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。

 Best,
 Jingsong Lee


 --
 From:Jark Wu 
 Send Time:2019年12月8日(星期日) 11:54
 To:user-zh 
 Subject:Re: Flink RetractStream如何转成AppendStream?

 Hi,

 目前 Kafka  只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持
 RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。

 Best,
 Jark

 On Sun, 8 Dec 2019 at 10:08, 陈帅  wrote:

 > 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka
 >
 > sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?
 >


Re: Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-09 Thread JingsongLee
Hi hjxhainan,

如果你要取消订阅。
请发送邮件到user-zh-unsubscr...@flink.apache.org

Best,
Jingsong Lee


--
From:hjxhai...@163.com 
Send Time:2019年12月10日(星期二) 10:52
To:user-zh ; JingsongLee ; 
陈帅 
Subject:Re: Re: Flink实时数仓落Hive一般用哪种方式好?


怎么退出邮件订阅




hjxhai...@163.com 
发件人: JingsongLee
发送时间: 2019-12-10 10:48
收件人: 陈帅; user-zh@flink.apache.org
主题: Re: Flink实时数仓落Hive一般用哪种方式好?
Hi 陈帅,
1.BulkWriter.Factory接口不适合ORC, 正如yue ma所说,你需要一些改动
2.StreamingFileSink整个机制都是基于做checkpoint才会真正move文件的,不知道你所想的streaming写是什么,以及对你的业务场景有什么要求吗?
Best,
Jingsong Lee
--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:21
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink实时数仓落Hive一般用哪种方式好?
1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢?
2. BulkWriter是不是攒微批写文件的?
JingsongLee  于2019年12月9日周一 下午3:24写道:
Hi 帅,
 - 目前可以通过改写StreamingFileSink的方式来支持Parquet。
 (但是目前StreamingFileSink支持ORC比较难)
 - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
 - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。
 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]
 [1] https://issues.apache.org/jira/browse/FLINK-14249
 Best,
 Jingsong Lee
 --
 From:陈帅 
 Send Time:2019年12月8日(星期日) 10:04
To:user-zh@flink.apache.org 
 Subject:Flink实时数仓落Hive一般用哪种方式好?
 有人说直接写到HBase,再在Hive关联Hbase表
 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:
 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
 写的话,目前来看没有现成的Streaming
 Writer,官方提供的都是
 BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
 业务上的Update和Delete操作 数据一般是如何sync进Hive的?
 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?  

Re: Flink RetractStream如何转成AppendStream?

2019-12-09 Thread JingsongLee
参考下lucas.wu的例子?

Best,
Jingsong Lee


--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:25
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink RetractStream如何转成AppendStream?

"你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。"
==>我想知道通过Flink SQL方式要如何实现这种转换?
JingsongLee  于2019年12月9日周一 下午3:17写道:
Hi 帅,

 你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。

 Best,
 Jingsong Lee


 --
 From:Jark Wu 
 Send Time:2019年12月8日(星期日) 11:54
 To:user-zh 
 Subject:Re: Flink RetractStream如何转成AppendStream?

 Hi,

 目前 Kafka  只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持
 RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。

 Best,
 Jark

 On Sun, 8 Dec 2019 at 10:08, 陈帅  wrote:

 > 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka
 >
 > sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?
 >


Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-09 Thread JingsongLee
Hi 陈帅,

1.BulkWriter.Factory接口不适合ORC, 正如yue ma所说,你需要一些改动
2.StreamingFileSink整个机制都是基于做checkpoint才会真正move文件的,不知道你所想的streaming写是什么,以及对你的业务场景有什么要求吗?

Best,
Jingsong Lee


--
From:陈帅 
Send Time:2019年12月10日(星期二) 08:21
To:user-zh@flink.apache.org ; JingsongLee 

Subject:Re: Flink实时数仓落Hive一般用哪种方式好?

1. 相比Parquet,目前StreamingFileSink支持ORC的难点在哪里呢?
2. BulkWriter是不是攒微批写文件的?
JingsongLee  于2019年12月9日周一 下午3:24写道:
Hi 帅,
 - 目前可以通过改写StreamingFileSink的方式来支持Parquet。
 (但是目前StreamingFileSink支持ORC比较难)
 - BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
 - 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。

 在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]

 [1] https://issues.apache.org/jira/browse/FLINK-14249

 Best,
 Jingsong Lee


 --
 From:陈帅 
 Send Time:2019年12月8日(星期日) 10:04
To:user-zh@flink.apache.org 
 Subject:Flink实时数仓落Hive一般用哪种方式好?

 有人说直接写到HBase,再在Hive关联Hbase表
 但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:

 1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
 写的话,目前来看没有现成的Streaming
 Writer,官方提供的都是
 BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
 业务上的Update和Delete操作 数据一般是如何sync进Hive的?

 2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?


Re: [flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime?

2019-12-08 Thread JingsongLee
Hi 猫猫:

在DDL上定义rowtime是刚刚支持的功能,文档正在编写中。[1]
你可以通过master的代码来试用,社区正在准备发布1.10,到时候会有release版本可用。

[2] 中有使用的完整例子,FYI。

[1] https://issues.apache.org/jira/browse/FLINK-14320
[2] 
https://github.com/apache/flink/blob/2ecf7cacbe742099d78c528de962fccf13c14629/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala

Best,
Jingsong Lee


--
From:猫猫 <16770...@qq.com>
Send Time:2019年12月6日(星期五) 17:52
To:user-zh 
Subject:[flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime?

我使用tableEnv.sqlUpdate(ddl);方式创建表


但是我无法像通过流注册方式一样,添加rowtime。我尝试在定义字段上添加rowtime,但是创表语句会报错。
请问在flink中是否支持使用该种方式创建流表,并开窗?


我的例子是一个csv源,我还没有尝试kafka源,但是我看了文档,也没有找到相关描述。


sql创表语句如下:
CREATE TABLE T_UserBehavior(
   userId BIGINT,
   itemId BIGINT,
   categoryId BIGINT,
   behavior VARCHAR,
   optime BIGINT
) WITH (
  'connector.type' = 'filesystem',   -- required: specify to 
connector type
  'connector.path' = 
'file:///E:\MyGitProject\flink-study\Hot-Item\src\main\resources\UserBehavior-less.csv',
  -- required: path to a file or directory
  'format.type' = 'csv',
  'format.fields.0.name' = 'userId', -- required: define the schema 
either by using type information
  'format.fields.0.type' = 'BIGINT',
  'format.fields.1.name' = 'itemId',
  'format.fields.1.type' = 'BIGINT',
  'format.fields.2.name' = 'categoryId',
  'format.fields.2.type' = 'BIGINT',
  'format.fields.3.name' = 'behavior',
  'format.fields.3.type' = 'VARCHAR',
  'format.fields.4.name' = 'optime',
  'format.fields.4.type' = 'BIGINT'
);

Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-08 Thread JingsongLee
Hi 帅,
- 目前可以通过改写StreamingFileSink的方式来支持Parquet。
(但是目前StreamingFileSink支持ORC比较难)
- BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
- 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。

在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]

[1] https://issues.apache.org/jira/browse/FLINK-14249

Best,
Jingsong Lee


--
From:陈帅 
Send Time:2019年12月8日(星期日) 10:04
To:user-zh@flink.apache.org 
Subject:Flink实时数仓落Hive一般用哪种方式好?

有人说直接写到HBase,再在Hive关联Hbase表
但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:

1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
写的话,目前来看没有现成的Streaming
Writer,官方提供的都是
BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
业务上的Update和Delete操作 数据一般是如何sync进Hive的?

2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?


Re: Flink RetractStream如何转成AppendStream?

2019-12-08 Thread JingsongLee
+1 to lucas.wu

Best,
Jingsong Lee


--
From:lucas.wu 
Send Time:2019年12月9日(星期一) 11:39
To:user-zh 
Subject:Re: Flink RetractStream如何转成AppendStream?

可以使用类似的方式
//   val sstream = result4.toRetractStream[Row],filter(_.1==trye).map(_._2)
//   val result5 = tEnv.fromDataStream(sstream)
//   result5.toAppendStream[Row].print()


原始邮件
发件人:Jark wuimj...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2019年12月8日(周日) 11:53
主题:Re: Flink RetractStream如何转成AppendStream?


Hi, 目前 Kafka 只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持 
RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。 Best, Jark On Sun, 8 Dec 
2019 at 10:08, 陈帅 casel.c...@gmail.com wrote:  在用Flink做实时数仓时遇到group 
by统计后需要将结果发到kafka,但是现在的kafka   
sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?

Re: Flink RetractStream如何转成AppendStream?

2019-12-08 Thread JingsongLee
Hi 帅,

你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。

Best,
Jingsong Lee


--
From:Jark Wu 
Send Time:2019年12月8日(星期日) 11:54
To:user-zh 
Subject:Re: Flink RetractStream如何转成AppendStream?

Hi,

目前 Kafka  只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持
RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。

Best,
Jark

On Sun, 8 Dec 2019 at 10:08, 陈帅  wrote:

> 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka
>
> sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?
>


Re: DML去重,translate时报错

2019-11-21 Thread JingsongLee
Hi 叶贤勋:

现在去重现在支持insert into select 语法。
问题在于你的这个SQL怎么没产出UniqueKey
这里面可能有blink-planner的bug。
CC: @Jark Wu @godfrey he (JIRA)

Best,
Jingsong Lee


--
From:叶贤勋 
Send Time:2019年11月21日(星期四) 16:20
To:user-zh@flink.apache.org 
Subject:DML去重,translate时报错

Hi 大家好:
Flink版本1.9.0,
SQL1:
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
SQL2:

CREATE TABLE user_dist (
dt VARCHAR,
user_id VARCHAR,
behavior VARCHAR
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
'connector.table' = 'user_behavior_dup',
'connector.username' = 'root',
'connector.password' = ‘**',
'connector.write.flush.max-rows' = '1'
);
SQL3:

INSERT INTO user_dist
SELECT
  dt,
  user_id,
  behavior
FROM (
   SELECT
  dt,
  user_id,
  behavior,
 ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc desc ) 
AS rownum
   FROM (select DATE_FORMAT(ts, '-MM-dd HH:00') as 
dt,user_id,behavior,PROCTIME() as proc
from user_log) )
WHERE rownum = 1;


在对SQL3执行tableEnv.sqlUpdate(SQL3)时,报错:
Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)


请问去重现在不支持insert into select 语法吗?


| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制




Re: count distinct not supported in batch?

2019-09-19 Thread JingsongLee
Hi fanbin:
It is "distinct aggregates for group window" in batch sql mode.
Now,
legacy planner: not support.
blink planner: not support.
There is no clear plan yet.
But if the demand is strong, we can consider supporting it.

Best,
Jingsong Lee


--
From:Fanbin Bu 
Send Time:2019年9月20日(星期五) 06:03
To:user 
Subject:count distinct not supported in batch?

Hi,

Just found that count distinct is supported in streaming but not in batch 
(version 1.8), is there any plan to add this to batch?

SELECT
  user_id
  , hop_end(created_at, interval '30' second, interval '30' second) as bucket_ts
  , count(distinct name)
FROM $table
GROUP BY
  user_id
  , hop(created_at, interval '30' second, interval '30' second)

Thanks,
Fanbin 

Re: Streaming write to Hive

2019-09-05 Thread JingsongLee
Hi luoqi:

With partition support[1], I want to introduce a FileFormatSink to
 cover streaming exactly-once and partition-related logic for flink
 file connectors and hive connector. You can take a look.

[1] 
https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing

Best,
Jingsong Lee


--
From:Bowen Li 
Send Time:2019年9月6日(星期五) 05:21
To:Qi Luo 
Cc:user ; snake.fly318 ; 
lichang.bd 
Subject:Re: Streaming write to Hive

Hi, 

I'm not sure if there's one yet. Feel free to create one if not.
On Wed, Sep 4, 2019 at 11:28 PM Qi Luo  wrote:

Hi Bowen,

Thank you for the information! Streaming write to Hive is a very common use 
case for our users. Is there any open issue for this to which we can try 
contributing?

+Yufei and Chang who are also interested in this.

Thanks,
Qi
On Thu, Sep 5, 2019 at 12:16 PM Bowen Li  wrote:
Hi Qi,

With 1.9 out of shelf, I'm afraid not. You can make HiveTableSink implements 
AppendStreamTableSink (an empty interface for now) so it can be picked up in 
streaming job. Also, streaming requires checkpointing, and Hive sink doesn't do 
that yet. There might be other tweaks you need to make.

It's on our list for 1.10, not high priority though.

Bowen
On Wed, Sep 4, 2019 at 2:23 AM Qi Luo  wrote:
Hi guys,

In Flink 1.9 HiveTableSink is added to support writing to Hive, but it only 
supports batch mode. StreamingFileSink can write to HDFS in streaming mode, but 
it has no Hive related functionality (e.g. adding Hive partition).

Is there any easy way we can streaming write to Hive (with exactly-once 
guarantee)?

Thanks,
Qi



Re: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

2019-09-05 Thread JingsongLee
override getResultType方法,返回Types.SQL_TIMESTAMP.
这样应该可以绕过。
1.10会修复这个问题。

Best,
Jingsong Lee


--
From:守护 <346531...@qq.com>
Send Time:2019年9月5日(星期四) 12:11
To:user-zh@flink.apache.org JingsongLee ; user-zh 

Subject:回复: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

在哪声明DataType,这个要引入什么包吗,求指点,我的udf代码如下:

import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;


public class UTC2Local extends ScalarFunction {
public Timestamp eval(Timestamp s) {
long timestamp = s.getTime() + 2880;
return new Timestamp(timestamp);
}

}



-- 原始邮件 --
发件人: "JingsongLee";
发送时间: 2019年9月5日(星期四) 中午11:55
收件人: "user-zh";
主题:  Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

你声明了DataType吗?代码怎么写的?
由于目前只支持精度<=3,所以你得用DataTypes.TIMESTAMP(3)来表示。

Best,
Jingsong Lee


--
From:守护 <346531...@qq.com>
Send Time:2019年9月5日(星期四) 11:48
To:user-zh 
Subject:flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

社区的各位大佬好:


使用场景:flink1.9版本使用flinkSQL创建udf函数使用没有问,当切换到blinkSQL使用这个udf就会报错TIMESTAMP类型错误,udf实现的功能也很简单,就是将时间+8小时,报错信息如下


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Not support dataType: TIMESTAMP(9)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at 
java.security.AccessController.doPrivileged(Native Method)
at 
javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
at 
org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
at 
org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
at 
org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)

Re: 回复: 关于Flink SQL DISTINCT问题

2019-09-04 Thread JingsongLee
一般是按时间(比如天)来group by,state配置了超时过期的时间。
基本的去重方式就是靠state(比如RocksDbState)。
有mini-batch来减少对state的访问。

如果有倾斜,那是解倾斜问题的话题了。

Best,
Jingsong Lee


--
From:lvwenyuan 
Send Time:2019年9月4日(星期三) 15:11
To:user-zh 
Subject:Re:回复: 关于Flink SQL DISTINCT问题

对,肯定是按照窗口去重的。我就想问下,窗口去重时,所采用的方式
在 2019-09-04 14:38:29,"athlon...@gmail.com"  写道:
>在窗口内去重吧,不可能无限保留去重数据的
>
>
>
>athlon...@gmail.com
> 
>发件人: lvwenyuan
>发送时间: 2019-09-04 14:28
>收件人: user-zh
>主题: 关于Flink SQL DISTINCT问题
>各位大佬好:
>   我想问下,关于flink sql的实时去重,就是count(distinct user_id) 
> 。就是Flink内部是如何做到实时去重,如果对于数据量比较大的时候实时去重,是否会有性能问题。用的Blink Planner


Re: Flink SQL 时间问题

2019-09-03 Thread JingsongLee
Hi:
1.是的,目前只能是UTC,如果你有计算要求,你可以考虑改变的业务的窗口时间。
2.支持long的,你输入是不是int才会报错的,具体报错的信息?

Best,
Jingsong Lee


--
From:hb <343122...@163.com>
Send Time:2019年9月3日(星期二) 10:44
To:user-zh 
Subject:Flink SQL 时间问题

使用kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table


```
  ...


  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()


  schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
  new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000)
)
```


问题1.  生成的 _proctime 处理时间字段, 结果显示的时区是UTC, 怎么调整成 +8时区.
问题2.  eventTime 事件时间字段怎么支持Long类型.


我输入到kafka记录为 {"eventTime": 10, "id":1,"name":"hb"} 会提示 eventTime 字段类型问题

Re:question

2019-09-03 Thread JingsongLee
should be schema.field(“msg”, Types.ROW(...))?
And you should select msg.f1 from table.

Best
Jingsong Lee




来自阿里邮箱 iPhone版
 --Original Mail --
From:圣眼之翼 <2463...@qq.com>
Date:2019-09-03 09:22:41
Recipient:user 
Subject:question
How do you do:
My problem is flink table format and table schema mapping.
The input data is similar to the following json format:
{ "id": "123", "serial": "6b0c2d26", "msg": { "f1": "5677" } } The format code 
for TableSource is as follows: new Json().schema(Types.ROW(new String[] { "id", 
"serial", "msg" }, new TypeInformation << ? > [] { Types.STRING(), 
Types.STRING(), Types.ROW(new String[] { "f1" }, new TypeInformation << ? > [] 
{ Types.STRING() }) }));

The schema part of TableSource is as follows:
Schema schema = new Schema(); schema.field("id", Types.STRING()); 
schema.field("serial", Types.STRING());

I don't know how to define the f1 field of msg in the schema. I tried 
schema.field("f1", Types.STRING()) before; but I will report an error. What is 
the correct method?
The following SQL can be run correctly:
select id,serial,f1 from table; 

My flink version is 1.8.1,use flink table & SQL API

thanks;

Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-22 Thread JingsongLee
Congratulations~~~ Thanks gordon and everyone~

Best,
Jingsong Lee


--
From:Oytun Tez 
Send Time:2019年8月22日(星期四) 14:06
To:Tzu-Li (Gordon) Tai 
Cc:dev ; user ; announce 

Subject:Re: [ANNOUNCE] Apache Flink 1.9.0 released

Congratulations team; thanks for the update, Gordon.

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com

On Thu, Aug 22, 2019 at 8:03 AM Tzu-Li (Gordon) Tai  wrote:

The Apache Flink community is very happy to announce the release of Apache 
Flink 1.9.0, which is the latest major release.

Apache Flink(r) is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this new major release:
https://flink.apache.org/news/2019/08/22/release-1.9.0.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344601

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Cheers,
Gordon


Re: [Discuss] What should the "Data Source" be translated into Chinese

2019-08-13 Thread JingsongLee
可以直接保留不用翻译吗?

Best,
Jingsong Lee


--
From:WangHengwei 
Send Time:2019年8月13日(星期二) 11:50
To:user-zh 
Subject:[Discuss] What should the "Data Source" be translated into Chinese

Hi all,


I'm working on [FLINK-13405] Translate "Basic API Concepts" page into 
Chinese. I have a problem. 

Usually we translate "Data Source" into "数据源" but there is no agreed 
translation for "Data Sink". Since it often appears in documents, I think we'd 
better to have a unified translation. I have some alternatives, e.g. 
"数据沉淀","数据归" or "数据终".

 Committer Xingcan Cui has a good suggestion for "数据汇" which corresponds to 
source ("数据源"). I asked Committer Jark Wu, he is also fine with it. I think 
"数据汇" is a good representation of flow charactiristics so I would like to use 
it. 


I want to hear more thoughts from the community whether we should translate 
it and what it should be translated into.


Thanks,
WangHW

Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-07 Thread JingsongLee
Congrats Hequn!

Best,
Jingsong Lee


--
From:Biao Liu 
Send Time:2019年8月7日(星期三) 12:05
To:Zhu Zhu 
Cc:Zili Chen ; Jeff Zhang ; Paul Lam 
; jincheng sun ; dev 
; user 
Subject:Re: [ANNOUNCE] Hequn becomes a Flink committer

Congrats Hequn!

Thanks,
Biao /'bɪ.aʊ/



On Wed, Aug 7, 2019 at 6:00 PM Zhu Zhu  wrote:

Congratulations to Hequn!

Thanks,
Zhu Zhu
Zili Chen  于2019年8月7日周三 下午5:16写道:
Congrats Hequn!

Best,
tison.

Jeff Zhang  于2019年8月7日周三 下午5:14写道:
Congrats Hequn!

Paul Lam  于2019年8月7日周三 下午5:08写道:
Congrats Hequn! Well deserved!
Best,
Paul Lam 

在 2019年8月7日,16:28,jincheng sun  写道:
Hi everyone,

I'm very happy to announce that Hequn accepted the offer of the Flink PMC to 
become a committer of the Flink project.

Hequn has been contributing to Flink for many years, mainly working on 
SQL/Table API features. He's also frequently helping out on the user mailing 
lists and helping check/vote the release.

Congratulations Hequn!

Best, Jincheng 
(on behalf of the Flink PMC)



-- 
Best Regards

Jeff Zhang

Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread JingsongLee
Hi caizhi and kali:

I think this table should use toRetractStream instead of toAppendStream, and 
you should handle the retract messages. (If you just use distinct, the message 
should always be accumulate message)

Best, JingsongLee


--
From:Caizhi Weng 
Send Time:2019年7月16日(星期二) 09:52
To:sri hari kali charan Tummala 
Cc:user 
Subject:Re: Stream to CSV Sink with SQL Distinct Values

Hi Kali,

Currently Flink treats all aggregate functions as retractable. As `distinct` is 
an aggregate function, it's considered by the planner that it might update or 
retract records (although from my perspective it won't...). Because csv table 
sink is an append only sink (it's hard to update what has been written in the 
middle of a file), the exception you mentioned occurs.

However, you can use `toAppendStream` method to change the retractable stream 
to an append only stream. For example, 
`tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get an 
append only stream. You can then add csv sink to this stream.
sri hari kali charan Tummala  于2019年7月16日周二 上午3:32写道:

Hi All, 

I am trying to read data from kinesis stream and applying SQL transformation 
(distinct) and then tryting to write to CSV sink which is failinf due to this 
issue (org.apache.flink.table.api.TableException: AppendStreamTableSink 
requires that Table has only insert changes.) , full code is here 
(https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112).

can anyone help me moveforward on this issue?

Full Code:- 
// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironment
//env.enableCheckpointing(10)

val tEnv = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new 
SimpleStringSchema(), consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String, 
String,String,String,String,String,String,String,String]] =
  new MapFunction[String, Tuple10[String, String, 
String,String,String,String,String,String,String,String]]() {

override def map(s: String): Tuple10[String, String, 
String,String,String,String,String,String,String,String] = {
  val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

  val csvData = data.getCc_num+","+
data.getFirst+","+
data.getLast+","+
data.getTrans_num+","+
data.getTrans_time+","+
data.getCategory+","+
data.getMerchant+","+
data.getAmt+","+
data.getMerch_lat+","+
data.getMerch_long

  //println(csvData)

  val p:Array[String] = csvData.split(",")
  var cc_num:String = p(0)
  var first:String = p(1)
  var last:String = p(2)
  var trans_num:String = p(3)
  var trans_time:String = p(4)
  var category:String = p(5)
  var merchant:String = p(6)
  var amt:String = p(7)
  var merch_lat:String = p(8)
  var merch_long:String = p(9)

  val creationDate: Time = new Time(System.currentTimeMillis())
  return new Tuple10(cc_num, first, 
last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
}
  }

val data = kinesis.map(mapFunction)

//data.print()

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct 
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
 FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

val table1 = table.distinct()

tEnv.registerTable("fromAnotherTable",table1)

table.printSchema()

val csvSink:TableSink[Row]  = new 
CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
val fieldNames:Array[String]  = 
Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_

Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread JingsongLee
Congratulations Rong. 
Rong Rong has done a lot of nice work in the past time to the flink community.

Best, JingsongLee


--
From:Rong Rong 
Send Time:2019年7月12日(星期五) 08:09
To:Hao Sun 
Cc:Xuefu Z ; dev ; Flink ML 

Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer

Thank you all for the warm welcome!

It's my honor to become an Apache Flink committer. 
I will continue to work on this great project and contribute more to the 
community.

Cheers,
Rong
On Thu, Jul 11, 2019 at 1:05 PM Hao Sun  wrote:

Congratulations Rong. 
On Thu, Jul 11, 2019, 11:39 Xuefu Z  wrote:
Congratulations, Rong!

On Thu, Jul 11, 2019 at 10:59 AM Bowen Li  wrote:
Congrats, Rong!


 On Thu, Jul 11, 2019 at 10:48 AM Oytun Tez  wrote:

 > Congratulations Rong!
 >
 > ---
 > Oytun Tez
 >
 > *M O T A W O R D*
 > The World's Fastest Human Translation Platform.
 > oy...@motaword.com — www.motaword.com
 >
 >
 > On Thu, Jul 11, 2019 at 1:44 PM Peter Huang 
 > wrote:
 >
 >> Congrats Rong!
 >>
 >> On Thu, Jul 11, 2019 at 10:40 AM Becket Qin  wrote:
 >>
 >>> Congrats, Rong!
 >>>
 >>> On Fri, Jul 12, 2019 at 1:13 AM Xingcan Cui  wrote:
 >>>
 >>>> Congrats Rong!
 >>>>
 >>>> Best,
 >>>> Xingcan
 >>>>
 >>>> On Jul 11, 2019, at 1:08 PM, Shuyi Chen  wrote:
 >>>>
 >>>> Congratulations, Rong!
 >>>>
 >>>> On Thu, Jul 11, 2019 at 8:26 AM Yu Li  wrote:
 >>>>
 >>>>> Congratulations Rong!
 >>>>>
 >>>>> Best Regards,
 >>>>> Yu
 >>>>>
 >>>>>
 >>>>> On Thu, 11 Jul 2019 at 22:54, zhijiang 
 >>>>> wrote:
 >>>>>
 >>>>>> Congratulations Rong!
 >>>>>>
 >>>>>> Best,
 >>>>>> Zhijiang
 >>>>>>
 >>>>>> --
 >>>>>> From:Kurt Young 
 >>>>>> Send Time:2019年7月11日(星期四) 22:54
 >>>>>> To:Kostas Kloudas 
 >>>>>> Cc:Jark Wu ; Fabian Hueske ;
 >>>>>> dev ; user 
 >>>>>> Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer
 >>>>>>
 >>>>>> Congratulations Rong!
 >>>>>>
 >>>>>> Best,
 >>>>>> Kurt
 >>>>>>
 >>>>>>
 >>>>>> On Thu, Jul 11, 2019 at 10:53 PM Kostas Kloudas 
 >>>>>> wrote:
 >>>>>> Congratulations Rong!
 >>>>>>
 >>>>>> On Thu, Jul 11, 2019 at 4:40 PM Jark Wu  wrote:
 >>>>>> Congratulations Rong Rong!
 >>>>>> Welcome on board!
 >>>>>>
 >>>>>> On Thu, 11 Jul 2019 at 22:25, Fabian Hueske 
 >>>>>> wrote:
 >>>>>> Hi everyone,
 >>>>>>
 >>>>>> I'm very happy to announce that Rong Rong accepted the offer of the
 >>>>>> Flink PMC to become a committer of the Flink project.
 >>>>>>
 >>>>>> Rong has been contributing to Flink for many years, mainly working on
 >>>>>> SQL and Yarn security features. He's also frequently helping out on the
 >>>>>> user@f.a.o mailing lists.
 >>>>>>
 >>>>>> Congratulations Rong!
 >>>>>>
 >>>>>> Best, Fabian
 >>>>>> (on behalf of the Flink PMC)
 >>>>>>
 >>>>>>
 >>>>>>
 >>>>


-- 
Xuefu Zhang

"In Honey We Trust!"


Re: Flink Table API and Date fields

2019-07-08 Thread JingsongLee
Flink 1.9 blink runner will support it as Generic Type,
But I don't recommend it. After all, there are java.sql.Date and java.time.* in 
Java.

Best, JingsongLee


--
From:Flavio Pompermaier 
Send Time:2019年7月8日(星期一) 15:40
To:JingsongLee 
Cc:user 
Subject:Re: Flink Table API and Date fields

I think I could do it for this specific use case but isn't this a big 
limitation of Table API?
I think that java.util.Date should be a first class citizen in Flink..

Best,
Flavio
On Mon, Jul 8, 2019 at 4:06 AM JingsongLee  wrote:

Hi Flavio:
Looks like you use java.util.Date in your pojo, Now Flink table not support 
BasicTypeInfo.DATE_TYPE_INFO because of the limitations of some judgments in 
the code.
 Can you use java.sql.Date? 

Best, JingsongLee

--
From:Flavio Pompermaier 
Send Time:2019年7月5日(星期五) 22:52
To:user 
Subject:Flink Table API and Date fields

Hi to all,
in my use case I have a stream of POJOs with Date fields.
When I use Table API I get the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. Type is not supported: Date
 at 
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
 at org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
 at org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: 
Date
 at 
org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)


Is there a way to deal with this without converting the Date field to a Long 
one?

Best,
Flavio



Re: Flink Table API and Date fields

2019-07-07 Thread JingsongLee
Hi Flavio:
Looks like you use java.util.Date in your pojo, Now Flink table not support 
BasicTypeInfo.DATE_TYPE_INFO because of the limitations of some judgments in 
the code.
 Can you use java.sql.Date? 

Best, JingsongLee


--
From:Flavio Pompermaier 
Send Time:2019年7月5日(星期五) 22:52
To:user 
Subject:Flink Table API and Date fields

Hi to all,
in my use case I have a stream of POJOs with Date fields.
When I use Table API I get the following error:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. Type is not supported: Date
 at 
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
 at org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
 at org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
Caused by: org.apache.flink.table.api.TableException: Type is not supported: 
Date
 at 
org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)


Is there a way to deal with this without converting the Date field to a Long 
one?

Best,
Flavio 

Re: Providing Custom Serializer for Generic Type

2019-07-04 Thread JingsongLee
Hi Andrea:
Why not make your MyClass POJO? [1] If it is a POJO, then flink 
will use PojoTypeInfo and PojoSerializer that have a good 
implementation already.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#rules-for-pojo-types

Best, JingsongLee


--
From:Andrea Spina 
Send Time:2019年7月4日(星期四) 14:37
To:user 
Subject:Providing Custom Serializer for Generic Type

Dear community,
in my job, I run with a custom event type MyClass which is a sort of "generic 
event" that I handle all along my streaming flow both as an event 
(DataStream[MyClass]) and as a managed state.

I see that Flink warns me about generic serialization of MyClass

 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does 
not contain a setter for field io$radicalbit$MyClass$$schema
 INFO [run-main-0] (TypeExtractor.java:1857) - Class class 
io.radicalbit.MyClass cannot be used as a POJO type because not all fields are 
valid POJO fields, and must be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance.
 INFO [run-main-0] (TypeExtractor.java:1818) - class io.radicalbit.MyClass does 
not contain a setter for field io$radicalbit$MyClass$schema

So that I wanted to provide my custom serializer for MyClass, trying first to 
register the Java one to check if the system recognizes it so I followed [1] 
but it seems that it is not considered.

I read then about [2] (the case is way akin to mine) and AFAIU I need to 
implement a custom TypeInformation and TypeSerializer for my class as suggested 
in [3] because Flink will ignore my registered serializer as long as it 
considers my type as generic.

config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[RadicalSerde])
My question finally is: Do I need to provide this custom classes? Is there any 
practical example for creating custom information like the above mentioned? I 
have had a quick preliminary look at it but seems that I need to provide a 
non-trivial amount of information to TypeInformation and TypeSerializer 
interfaces.

Thank you for your excellent work and help.

Cheers. 

[1] - 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html
[2] - 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Serializer-for-Avro-GenericRecord-td25433.html
[3] - 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory
-- 
Andrea Spina
Head of R @ Radicalbit Srl 
Via Giovanni Battista Pirelli 11, 20124, Milano - IT



Re: [ANNOUNCE] Apache Flink 1.8.1 released

2019-07-03 Thread JingsongLee
Thanks jincheng for your great job.

Best, JingsongLee


--
From:Congxian Qiu 
Send Time:2019年7月3日(星期三) 14:35
To:d...@flink.apache.org 
Cc:Dian Fu ; jincheng sun ; 
Hequn Cheng ; user ; announce 

Subject:Re: [ANNOUNCE] Apache Flink 1.8.1 released

Thanks for being the release manager and the great job

Best,
Congxian

Jark Wu  于2019年7月3日周三 上午10:23写道:
Thanks for being the release manager and the great job!

 Cheers,
 Jark

 On Wed, 3 Jul 2019 at 10:16, Dian Fu  wrote:

 > Awesome! Thanks a lot for being the release manager. Great job! @Jincheng
 >
 > Regards,
 > Dian
 >
 > 在 2019年7月3日,上午10:08,jincheng sun  写道:
 >
 > I've also tweeted about it from my twitter:
 > https://twitter.com/sunjincheng121/status/1146236834344648704
 > later would be tweeted it from @ApacheFlink!
 >
 > Best, Jincheng
 >
 > Hequn Cheng  于2019年7月3日周三 上午9:48写道:
 >
 >> Thanks for being the release manager and the great work Jincheng!
 >> Also thanks to Gorden and the community making this release possible!
 >>
 >> Best, Hequn
 >>
 >> On Wed, Jul 3, 2019 at 9:40 AM jincheng sun 
 >> wrote:
 >>
 >>> Hi,
 >>>
 >>> The Apache Flink community is very happy to announce the release of
 >>> Apache Flink 1.8.1, which is the first bugfix release for the Apache Flink
 >>> 1.8 series.
 >>>
 >>> Apache Flink(r) is an open-source stream processing framework for
 >>> distributed, high-performing, always-available, and accurate data streaming
 >>> applications.
 >>>
 >>> The release is available for download at:
 >>> https://flink.apache.org/downloads.html
 >>>
 >>> Please check out the release blog post for an overview of the
 >>> improvements for this bugfix release:
 >>> https://flink.apache.org/news/2019/07/02/release-1.8.1.html
 >>>
 >>> The full release notes are available in Jira:
 >>>
 >>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345164
 >>>
 >>> We would like to thank all contributors of the Apache Flink community
 >>> who made this release possible!
 >>>
 >>> Great thanks to @Tzu-Li (Gordon) Tai  's offline
 >>> kind help!
 >>>
 >>> Regards,
 >>> Jincheng
 >>>
 >>
 >


Re: LookupableTableSource question

2019-07-02 Thread JingsongLee
> how do I enable Blink planner support? 
After flink-1.9 release, you can try Blink-planner.

>Since when is LATERAL TABLE available in Flink? Is it equivalent to using 
>temporal tables?
LATERAL TABLE is table function in table, it is available in Flink for a long 
time.[1]
It is different from temporal table.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/udfs.html#table-functions

Best, JingsongLee


--
From:Flavio Pompermaier 
Send Time:2019年7月1日(星期一) 21:26
To:JingsongLee 
Cc:user 
Subject:Re: LookupableTableSource question

I probably messed up with the meaning of eval()..thus it is called once for 
every distinct key (that could be composed by a combination of fields)?
So, the other question is..how do I enable Blink planner support? 
Since when is LATERAL TABLE available in Flink? Is it equivalent to using 
temporal tables [1]?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html

Best,
Flavio
On Sat, Jun 29, 2019 at 3:16 AM JingsongLee  wrote:
The keys means joint primary keys, it is not list of keys, in your case, maybe 
there is a single key?

Best, Jingsong Lee


来自阿里邮箱 iPhone版
 --Original Mail --
From:Flavio Pompermaier 
Date:2019-06-28 22:53:31
Recipient:JingsongLee 
CC:user 
Subject:Re: LookupableTableSource question
Sorry I copied and pasted twice the current eval method...I'd do this:

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...
On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier  wrote:
This could be a good fit, I'll try to dig into it and see if it can be adapted 
to a REST service.
The only strange thing I see is that the key of the local cache is per block of 
keys..am I wrong?
Shouldn't it cycle over the list of passed keys?

Right now it's the following:

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}   
 ...

while I'd use the following (also for JDBC):

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}   
 ...

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...

Am I missing something?


On Fri, Jun 28, 2019 at 4:18 PM JingsongLee  wrote:
Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. 
Or use
 blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?

[1] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
[2] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
[3] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75

 Best, JingsongLee

--
From:Flavio Pompermaier 
Send Time:2019年6月28日(星期五) 21:04
To:user 
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated 
lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either 
when a key was not found (a new key has probably been added in the mean time) 
or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very 
similar to what I'd like to achieve but I can't find a real-world example using 
it and it 

Re: LookupableTableSource question

2019-06-28 Thread JingsongLee
The keys means joint primary keys, it is not list of keys, in your case, maybe 
there is a single key?

Best, Jingsong Lee


来自阿里邮箱 iPhone版
 --Original Mail --
From:Flavio Pompermaier 
Date:2019-06-28 22:53:31
Recipient:JingsongLee 
CC:user 
Subject:Re: LookupableTableSource question
Sorry I copied and pasted twice the current eval method...I'd do this:

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...
On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier  wrote:

This could be a good fit, I'll try to dig into it and see if it can be adapted 
to a REST service.
The only strange thing I see is that the key of the local cache is per block of 
keys..am I wrong?
Shouldn't it cycle over the list of passed keys?

Right now it's the following:

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}   
 ...

while I'd use the following (also for JDBC):

Cache> cache;

public void eval(Object... keys) {
Row keyRow = Row.of(keys);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}   
 ...

public void eval(Object... keys) {
for (Object kkk : keys) {
Row keyRow = Row.of(kkk);
if (cache != null) {
List cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (Row cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
}
 ...

Am I missing something?


On Fri, Jun 28, 2019 at 4:18 PM JingsongLee  wrote:

Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. 
Or use
 blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?

[1] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
[2] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
[3] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75

 Best, JingsongLee


--
From:Flavio Pompermaier 
Send Time:2019年6月28日(星期五) 21:04
To:user 
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated 
lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either 
when a key was not found (a new key has probably been added in the mean time) 
or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very 
similar to what I'd like to achieve but I can't find a real-world example using 
it and it lacks of such 2 requirements (key-values are not refreshed after a 
configurable timeout and a KeyNotFound callback cannot be handled).

Any help is appreciated,
Flavio





Re: LookupableTableSource question

2019-06-28 Thread JingsongLee
Hi Flavio:

I just implement a JDBCLookupFunction[1]. You can use it as table function[2]. 
Or use
 blink temporal table join[3] (Need blink planner support).
I add a google guava cache in JDBCLookupFunction with configurable cacheMaxSize
(avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
Is that you want?

[1] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
[2] 
https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
[3] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75

 Best, JingsongLee


--
From:Flavio Pompermaier 
Send Time:2019年6月28日(星期五) 21:04
To:user 
Subject:LookupableTableSource question

Hi to all,
I have a use case where I'd like to enrich a stream using a rarely updated 
lookup table.
Basically, I'd like to be able to set a refresh policy that is triggered either 
when a key was not found (a new key has probably been added in the mean time) 
or a configurable refresh-period has elapsed.

Is there any suggested solution to this? The LookupableTableSource looks very 
similar to what I'd like to achieve but I can't find a real-world example using 
it and it lacks of such 2 requirements (key-values are not refreshed after a 
configurable timeout and a KeyNotFound callback cannot be handled).

Any help is appreciated,
Flavio



Re: Hello-world example of Flink Table API using a edited Calcite rule

2019-06-27 Thread JingsongLee
Got it, it's clear, TableStats is the important functions of ExternalCatalog. 
It is right way.

Best, JingsongLee


--
From:Felipe Gutierrez 
Send Time:2019年6月27日(星期四) 14:53
To:JingsongLee 
Cc:user 
Subject:Re: Hello-world example of Flink Table API using a edited Calcite rule

Hi JingsongLee,

Sorry for not explain very well. I am gonna try a clarification of my idea.
1 - I want to use InMemoryExternalCatalog in a way to save some statistics 
which I create by listening to a stream.
2 - Then I will have my core application using Table API to execute some 
aggregation/join.
3 - Because the application on 2 uses Table API, I am able to influence its 
plan through Calcite configuration rules. So, I am gonna use the statistics 
from 1 to change the rules dynamic on 2.

Do you think it is clear? and it is a feasible application with the current 
capabilities of Table API?
ps.: I am gonna look at the links that you mentioned. Thanks for that!

Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com

On Thu, Jun 27, 2019 at 7:23 AM JingsongLee  wrote:

Hi Felipe:

Yeah, you can use InMemoryExternalCatalog and CalciteConfig,
 but I don't quite understand what you mean.
InMemoryExternalCatalog provides methods to create, drop, and 
alter (sub-)catalogs or tables. And CalciteConfig is for defining a
 custom Calcite configuration. They are two separate things.

About InMemoryExternalCatalog, You can take a look at [1]. 
Csv has been renamed to OldCsv [2], But recommendation 
using the RFC-compliant `Csv` format in the dedicated
 `flink-formats/flink-csv` module instead.


[1] 
https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
[2] 
https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsv.scala

Best, JingsongLee

--
From:Felipe Gutierrez 
Send Time:2019年6月26日(星期三) 20:58
To:JingsongLee 
Cc:user 
Subject:Re: Hello-world example of Flink Table API using a edited Calcite rule

Hi JingsongLee,

it is still not very clear to me. I imagine that I can create an 
InMemoryExternalCatalog and insert some tuples there (which will be in memory). 
Then I can use Calcite to use the values of my InMemoryExternalCatalog and 
change my plan. Is that correct?

Do you have an example of how to create an InMemoryExternalCatalog using Flink 
1.8? Because I guess the Csv [1] class does not exist anymore.

[1] 
https://github.com/srapisarda/stypes-flink/blob/feature/sql/src/test/scala/uk/ac/bbk/dcs/stypes/flink/CalciteEmptyConsistencyTest.scala#L73

Kind Regards,
Felipe


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com

On Wed, Jun 26, 2019 at 1:36 PM JingsongLee  wrote:
Hi Felipe:
I think your approach is absolutely right. You can try to do some plan test 
just like [1].
You can find more CalciteConfigBuilder API test in [2].

1.https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala#L168
2.https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/calcite/CalciteConfigBuilderTest.scala

Best, JingsongLee

--
From:Felipe Gutierrez 
Send Time:2019年6月26日(星期三) 18:04
To:user 
Subject:Hello-world example of Flink Table API using a edited Calcite rule

Hi,

does someone have a simple example using Table API and a Calcite rule which 
change/optimize the query execution plan of a query in Flink?

From the official documentation, I know that I have to create a CalciteConfig 
object [1]. Then, I based my firsts tests on this stackoverflow post [2] and I 
implemented this piece of code:

 // change the current calcite config plan
 CalciteConfigBuilder ccb = new CalciteConfigBuilder();
 RuleSet ruleSets = RuleSets.ofList(FilterMergeRule.INSTANCE);
 ccb.addLogicalOptRuleSet(ruleSets);
 TableConfig tableConfig = new TableConfig();
 tableConfig.setCalciteConfig(ccb.build());

I suppose that with this I can change the query plan of the Flink Table API. I 
am also not sure if I will need to use an external catalog like this post 
assumes to use [3].
In a nutshell, I would like to have a simple example where I can execute a 
query using Flink Table API and change its query execution plan using a Calcite 
rule. Does anyone have a Hello world of it? I plan to use it on this example 
[4].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#query-optimization
[2] 
https://stackoverflow.com/questions/54278508/how-can-i-create-an-external-catalog-table-in-apache-flink
[3] 
https://stackoverflow.com/questions

Re: Best Flink SQL length proposal

2019-06-26 Thread JingsongLee
Hi Simon,

Hope you can wrap them simply.
In our scenario, there are also many jobs that have so many columns,
 the huge generated code not only lead to compile exception, but also
 lead to the code cannot be optimized by JIT.

We are planning to introduce a Java code Splitter (analyze Java code and
 make appropriate segmentation when compile) to solve this problem
thoroughly in blink planner. Maybe it will in release-1.10. 

Best, JingsongLee


--
From:Simon Su 
Send Time:2019年6月27日(星期四) 11:22
To:JingsongLee 
Cc:user 
Subject:Re: Best Flink SQL length proposal


 
Hi Jiongsong

Thanks for your reply.

It seems that to wrap fields is a feasible way for me now. And there 
already exists another JIRA FLINK-8921 try to improve this.

Thanks,
Simon
On 06/26/2019 19:21,JingsongLee wrote: 
Hi Simon:
Does your code include the PR[1]? 
If include: try set TableConfig.setMaxGeneratedCodeLength smaller (default 
64000)?
If exclude: Can you wrap some fields to a nested Row field to reduce field 
number.

1.https://github.com/apache/flink/pull/5613

--
From:Simon Su 
Send Time:2019年6月26日(星期三) 17:49
To:user 
Subject:Best Flink SQL length proposal


 Hi all, 
Currently I faced a problem caused by a long Flink SQL. 
My sql is like “insert into tableA select a, b, c …….from sourceTable”, I 
have more than 1000 columns in select target, so that’s the problem, flink code 
generator will generate a RichMapFunction class and contains a map function 
which exceed the JVM max method limit (64kb). It throws the exception like:
 Caused by: java.lang.RuntimeException: Compiling 
"DataStreamSinkConversion$3055": Code of method 
"map(Ljava/lang/Object;)Ljava/lang/Object;" of class 
"DataStreamSinkConversion$3055" grows beyond 64 KB
So is there any best practice for this ?

Thanks,
Simon






Re: Hello-world example of Flink Table API using a edited Calcite rule

2019-06-26 Thread JingsongLee
Hi Felipe:

Yeah, you can use InMemoryExternalCatalog and CalciteConfig,
 but I don't quite understand what you mean.
InMemoryExternalCatalog provides methods to create, drop, and 
alter (sub-)catalogs or tables. And CalciteConfig is for defining a
 custom Calcite configuration. They are two separate things.

About InMemoryExternalCatalog, You can take a look at [1]. 
Csv has been renamed to OldCsv [2], But recommendation 
using the RFC-compliant `Csv` format in the dedicated
 `flink-formats/flink-csv` module instead.


[1] 
https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
[2] 
https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/OldCsv.scala

Best, JingsongLee


--
From:Felipe Gutierrez 
Send Time:2019年6月26日(星期三) 20:58
To:JingsongLee 
Cc:user 
Subject:Re: Hello-world example of Flink Table API using a edited Calcite rule

Hi JingsongLee,

it is still not very clear to me. I imagine that I can create an 
InMemoryExternalCatalog and insert some tuples there (which will be in memory). 
Then I can use Calcite to use the values of my InMemoryExternalCatalog and 
change my plan. Is that correct?

Do you have an example of how to create an InMemoryExternalCatalog using Flink 
1.8? Because I guess the Csv [1] class does not exist anymore.

[1] 
https://github.com/srapisarda/stypes-flink/blob/feature/sql/src/test/scala/uk/ac/bbk/dcs/stypes/flink/CalciteEmptyConsistencyTest.scala#L73

Kind Regards,
Felipe


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com

On Wed, Jun 26, 2019 at 1:36 PM JingsongLee  wrote:

Hi Felipe:
I think your approach is absolutely right. You can try to do some plan test 
just like [1].
You can find more CalciteConfigBuilder API test in [2].

1.https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala#L168
2.https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/calcite/CalciteConfigBuilderTest.scala

Best, JingsongLee

--
From:Felipe Gutierrez 
Send Time:2019年6月26日(星期三) 18:04
To:user 
Subject:Hello-world example of Flink Table API using a edited Calcite rule

Hi,

does someone have a simple example using Table API and a Calcite rule which 
change/optimize the query execution plan of a query in Flink?

From the official documentation, I know that I have to create a CalciteConfig 
object [1]. Then, I based my firsts tests on this stackoverflow post [2] and I 
implemented this piece of code:

 // change the current calcite config plan
 CalciteConfigBuilder ccb = new CalciteConfigBuilder();
 RuleSet ruleSets = RuleSets.ofList(FilterMergeRule.INSTANCE);
 ccb.addLogicalOptRuleSet(ruleSets);
 TableConfig tableConfig = new TableConfig();
 tableConfig.setCalciteConfig(ccb.build());

I suppose that with this I can change the query plan of the Flink Table API. I 
am also not sure if I will need to use an external catalog like this post 
assumes to use [3].
In a nutshell, I would like to have a simple example where I can execute a 
query using Flink Table API and change its query execution plan using a Calcite 
rule. Does anyone have a Hello world of it? I plan to use it on this example 
[4].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#query-optimization
[2] 
https://stackoverflow.com/questions/54278508/how-can-i-create-an-external-catalog-table-in-apache-flink
[3] 
https://stackoverflow.com/questions/54278508/how-can-i-create-an-external-catalog-table-in-apache-flink
[4] 
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/MqttSensorDataAverageTableAPI.java#L40

Kind Regards,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com




Re: Hello-world example of Flink Table API using a edited Calcite rule

2019-06-26 Thread JingsongLee
Hi Felipe:
I think your approach is absolutely right. You can try to do some plan test 
just like [1].
You can find more CalciteConfigBuilder API test in [2].

1.https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala#L168
2.https://github.com/apache/flink/blob/release-1.8/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/calcite/CalciteConfigBuilderTest.scala

Best, JingsongLee


--
From:Felipe Gutierrez 
Send Time:2019年6月26日(星期三) 18:04
To:user 
Subject:Hello-world example of Flink Table API using a edited Calcite rule

Hi,

does someone have a simple example using Table API and a Calcite rule which 
change/optimize the query execution plan of a query in Flink?

From the official documentation, I know that I have to create a CalciteConfig 
object [1]. Then, I based my firsts tests on this stackoverflow post [2] and I 
implemented this piece of code:

 // change the current calcite config plan
 CalciteConfigBuilder ccb = new CalciteConfigBuilder();
 RuleSet ruleSets = RuleSets.ofList(FilterMergeRule.INSTANCE);
 ccb.addLogicalOptRuleSet(ruleSets);
 TableConfig tableConfig = new TableConfig();
 tableConfig.setCalciteConfig(ccb.build());

I suppose that with this I can change the query plan of the Flink Table API. I 
am also not sure if I will need to use an external catalog like this post 
assumes to use [3].
In a nutshell, I would like to have a simple example where I can execute a 
query using Flink Table API and change its query execution plan using a Calcite 
rule. Does anyone have a Hello world of it? I plan to use it on this example 
[4].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#query-optimization
[2] 
https://stackoverflow.com/questions/54278508/how-can-i-create-an-external-catalog-table-in-apache-flink
[3] 
https://stackoverflow.com/questions/54278508/how-can-i-create-an-external-catalog-table-in-apache-flink
[4] 
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/MqttSensorDataAverageTableAPI.java#L40

Kind Regards,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com



Re: Best Flink SQL length proposal

2019-06-26 Thread JingsongLee
Hi Simon:
Does your code include the PR[1]? 
If include: try set TableConfig.setMaxGeneratedCodeLength smaller (default 
64000)?
If exclude: Can you wrap some fields to a nested Row field to reduce field 
number.

1.https://github.com/apache/flink/pull/5613


--
From:Simon Su 
Send Time:2019年6月26日(星期三) 17:49
To:user 
Subject:Best Flink SQL length proposal


 Hi all, 
Currently I faced a problem caused by a long Flink SQL. 
My sql is like “insert into tableA select a, b, c …….from sourceTable”, I 
have more than 1000 columns in select target, so that’s the problem, flink code 
generator will generate a RichMapFunction class and contains a map function 
which exceed the JVM max method limit (64kb). It throws the exception like:
 Caused by: java.lang.RuntimeException: Compiling 
"DataStreamSinkConversion$3055": Code of method 
"map(Ljava/lang/Object;)Ljava/lang/Object;" of class 
"DataStreamSinkConversion$3055" grows beyond 64 KB
So is there any best practice for this ?

Thanks,
Simon





Re: TableException

2019-06-12 Thread JingsongLee
Hi Pramit:
AppendStreamTableSink defines an external TableSink to emit a streaming table 
with only insert changes. If the table is also modified by update or delete 
changes, a TableException will be thrown.[1]
Your sql seems have update or delete changes.
You can try to use RetractStreamTableSink or UpsertStreamTableSink. 
(Unfortunately, we don't have Retract/Upsert JDBC Sink now, you can try to do 
by yourself)


[1]https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#appendstreamtablesink

Best, JingsongLee


--
From:Pramit Vamsi 
Send Time:2019年6月13日(星期四) 01:35
To:user 
Subject:TableException

Hi,

I am attempting the following:

String sql = "INSERT INTO table3 "
+ "SELECT col1, col2,  window_start_time ,  window_end_time , 
MAX(col3), MAX(col4), MAX(col5) FROM "
+ "(SELECT col1,col2, "
+ "TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start_time, "
+ "TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end_time, "
  
+ "FROM table1"
+ "WHERE"
+ "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), col1, col2"
+ "UNION "
+ "SELECT col1, col2, "
+ "TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start_time, "
+ "TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end_time, "

+ "FROM table2"
+ "WHERE . "
+ "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),  col1, col2  ) "
+ " window_start_time, window_end_time, col1, col2";

tableEnv.sqlUpdate( sql  );

I am using JDBCAppendTableSink. 

Exception:
org.apache.flink.table.api.TableException: AppendStreamTableSink requires that 
Table has only insert changes.

What in the query should I fix? 



Re: can flink sql handle udf-generated timestamp field

2019-06-06 Thread JingsongLee
Hi @Yu Yang:
Time-based operations such as windows in both the Table API and SQL require
 information about the notion of time and its origin. Therefore, tables can 
offer
 logical time attributes for indicating time and accessing corresponding 
timestamps
 in table programs.[1]
This mean Window can only be defined over a time attribute column.
You need define a rowtime in your source just like (UserActionTime is a long 
field, you don't need convert it to Timestamp):
Table table = tEnv.fromDataStream(stream, "Username, Data, 
UserActionTime.rowtime");
See more information in below document:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time

Best, JingsongLee


--
From:Yu Yang 
Send Time:2019年6月5日(星期三) 14:57
To:user 
Subject:can flink sql handle udf-generated timestamp field

Hi, 

I am trying to use Flink SQL to do aggregation on a hopping window. In the data 
stream, we store the timestamp in long type. So I wrote a UDF 'FROM_UNIXTIME' 
to convert long to Timestamp type. 

  public static class TimestampModifier extends ScalarFunction {
public Timestamp eval(long t) {
  return new Timestamp(t);
}
public TypeInformation getResultType(Class[] signature) {
  return Types.SQL_TIMESTAMP;
}
  }

With the above UDF, I wrote the following query, and ran into  
"ProgramInvocationException: The main method caused an error: Window can only 
be defined over a time attribute column". 
Any suggestions on how to resolve this issue? I am using Flink 1.8 for this 
experiment. 

my sql query: 

select  keyid, sum(value) 
from ( 
   select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value 
   from orders) 
 group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid 

flink exception: 

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Window can only be defined over a time attribute column.
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
 at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
 at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
 at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.ValidationException: Window can only be 
defined over a time attribute column.
 at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
 at 
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
 at 
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
 at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
 at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
 at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)

Regards, 
-Yu



Re: Clean way of expressing UNNEST operations

2019-06-04 Thread JingsongLee
Hi @Piyush Narang

I tried again, if the type of advertiser_event.products is derived correctly. 
(ObjectTypeInfo(RowTypeInfo(fields...)))
It will work. See more information in calcite code: 
SqlUnnestOperator.inferReturnType
So I think maybe your type is not passed to the engine correctly.

Best, JingsongLee


--
From:JingsongLee 
Send Time:2019年6月4日(星期二) 13:35
To:Piyush Narang ; user@flink.apache.org 

Subject:Re: Clean way of expressing UNNEST operations

Hi @Piyush Narang
It seems that Calcite's type inference is not perfect, and the fields of return 
type can not be inferred in UNNEST. (Errors were reported during the Calcite 
Validate phase.)

But UDTF supports this usage, and if it's convenient, you might consider 
writing a UDTF with similar UNNEST functions to try it out. (Use JOIN LATERAL 
TABLE)

Best, JingsongLee


--
From:Piyush Narang 
Send Time:2019年6月4日(星期二) 00:20
To:user@flink.apache.org 
Subject:Clean way of expressing UNNEST operations

  
Hi folks,
 
I’m using the SQL API and trying to figure out the best way to unnest and 
operate on some data. 
My data is structured as follows:
Table:
Advertiser_event: 
Partnered: Int
Products: Array< Row< price: Double, quantity: Int, … > >
…
 
I’m trying to unnest the products array and then compute something on a couple 
of fields in the product row (e.g. price * quantity)
 
My query looks like this:
SELECT partnerId, price, quantity FROM advertiser_event, 
UNNEST(advertiser_event.products) AS t (price, quantity, field3, field4, …)
 
My issue / problem is that, when I try to unnest this array I need to 
specify all the fields in the temp table as part of the unnest (“t” above). If 
I don’t, I get an error saying the number of fields doesn’t match what is 
expected. This makes my query a bit fragile in case additional fields are added 
/ removed from this product structure. 
 
Does anyone know if there’s a way around this? As a contrast on an engine like 
Presto, the unnest operation would yield a ‘product’ row type which I can then 
use to pick the fields I want “product.price”, “product.quantity”.
Presto query:
SELECT partnerId, product.price, product.quantity FROM advertiser_event CROSS 
JOIN UNNEST(products) AS product
 
Thanks,
 
-- Piyush


Re: Clean way of expressing UNNEST operations

2019-06-03 Thread JingsongLee
Hi @Piyush Narang
It seems that Calcite's type inference is not perfect, and the fields of return 
type can not be inferred in UNNEST. (Errors were reported during the Calcite 
Validate phase.)

But UDTF supports this usage, and if it's convenient, you might consider 
writing a UDTF with similar UNNEST functions to try it out. (Use JOIN LATERAL 
TABLE)

Best, JingsongLee


--
From:Piyush Narang 
Send Time:2019年6月4日(星期二) 00:20
To:user@flink.apache.org 
Subject:Clean way of expressing UNNEST operations

  
Hi folks,
 
I’m using the SQL API and trying to figure out the best way to unnest and 
operate on some data. 
My data is structured as follows:
Table:
Advertiser_event: 
Partnered: Int
Products: Array< Row< price: Double, quantity: Int, … > >
…
 
I’m trying to unnest the products array and then compute something on a couple 
of fields in the product row (e.g. price * quantity)
 
My query looks like this:
SELECT partnerId, price, quantity FROM advertiser_event, 
UNNEST(advertiser_event.products) AS t (price, quantity, field3, field4, …)
 
My issue / problem is that, when I try to unnest this array I need to 
specify all the fields in the temp table as part of the unnest (“t” above). If 
I don’t, I get an error saying the number of fields doesn’t match what is 
expected. This makes my query a bit fragile in case additional fields are added 
/ removed from this product structure. 
 
Does anyone know if there’s a way around this? As a contrast on an engine like 
Presto, the unnest operation would yield a ‘product’ row type which I can then 
use to pick the fields I want “product.price”, “product.quantity”.
Presto query:
SELECT partnerId, product.price, product.quantity FROM advertiser_event CROSS 
JOIN UNNEST(products) AS product
 
Thanks,
 
-- Piyush


Re: Flink SQL: Execute DELETE queries

2019-05-28 Thread JingsongLee
Hi @Papadopoulos, Konstantinos
I think you can try something like this:
JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
   .setDrivername("foo")
   .setDBUrl("bar")
   .setQuery("delete from %s where id = ?)")
   .setParameterTypes(FIELD_TYPES)
   .build();
Or you can build your own Sink code, where you can delete rows of DB table.

Best, JingsongLee


--
From:Papadopoulos, Konstantinos 
Send Time:2019年5月28日(星期二) 22:54
To:Vasyl Bervetskyi 
Cc:user@flink.apache.org 
Subject:RE: Flink SQL: Execute DELETE queries

  
The case I have in mind was to have an external JDBC table sink and try to 
delete a number of or all rows of the target DB table. Is it possible using 
Flink SQL?
 
From: Vasyl Bervetskyi  
Sent: Tuesday, May 28, 2019 5:36 PM
To: Papadopoulos, Konstantinos 
Cc: user@flink.apache.org
Subject: RE: Flink SQL: Execute DELETE queries  
Hi Papadopoulos,
 
Unfortunately no, there is no DELETE or MODIFY queries, you should create new 
table as a result of query which will filter records from existing one  
 
From: Papadopoulos, Konstantinos  
Sent: Tuesday, May 28, 2019 5:25 PM
To: user@flink.apache.org
Subject: Flink SQL: Execute DELETE queries  
 
Hi all,
 
I experiment on Flink Table API & SQL and I have the following question; is 
there any way to execute DELETE queries using Flink SQL?
 
Thanks in advance,
Konstantinos  

Re: Generic return type on a user-defined scalar function

2019-05-20 Thread JingsongLee
Hi Morrisa:

It seems that flink planner not support return Object(or generic, like you say, 
type erasure) in ScalarFunction.
In ScalarFunctionCallGen:
val functionCallCode =
  s"""
|${parameters.map(_.code).mkString("\n")}
|$resultTypeTerm $resultTerm = $functionReference.eval(
|  ${parameters.map(_.resultTerm).mkString(", ")});
|""".stripMargin
There should be a coercive transformation to eval return value to support this 
situation.
I have no ideas to bypass it. If you can modify the source code, you can change 
it to this way to support generic return type:
val functionCallCode =
  s"""
|${parameters.map(_.code).mkString("\n")}
|$resultTypeTerm $resultTerm = ($resultTypeTerm) $functionReference.eval(
|  ${parameters.map(_.resultTerm).mkString(", ")});
|""".stripMargin

Best, JingsongLee


--
From:Timo Walther 
Send Time:2019年5月20日(星期一) 23:03
To:user 
Subject:Re: Generic return type on a user-defined scalar function

 
Hi Morrisa,

usually, this means that you class is not recognized as a POJO. Please check 
again the requirements of a POJO: Default constructor, getters and setters for 
every field etc. You can use 
org.apache.flink.api.common.typeinfo.Types.POJO(...) to verify if your class is 
a POJO or not.

I hope this helps.

Regards,
Timo


Am 16.05.19 um 23:18 schrieb Morrisa Brenner:


Hi Flink folks, 
In a Flink job using the SQL API that I’m working on, I have a custom POJO data 
type with a generic field, and I would like to be able to call a user-defined 
function on this field. I included a similar function below with the business 
logic stubbed out, but this example has the return type I'm looking for. 
I have no issues using custom functions of this type when they're used in a 
select statement and the `getResultType` method is excluded from the 
user-defined function class, but I am unable to get the type information to 
resolve correctly in contexts like order by and group by statements. It still 
doesn't work even if the `getResultType` method defines the specific type for a 
given object explicitly because the job compiler within Flink seems to be 
assuming the return type from the `eval` method is just an Object (type 
erasure...), and it fails to generate the object code because it's detecting 
invalid casts to the desired output type. Without the `getResultType` method, 
it just fails to detect type entirely. This seems to be fine when it's just a 
select, but if I try to make it do any operation (like group by) I get the 
following error: "org.apache.flink.api.common.InvalidProgramException: This 
type (GenericType) cannot be used as key." 
Does anyone know if there's a way to get Flink to pay attention to the type 
information from `getResultType` when compiling the `eval` method so that the 
types work out? Or another way to work around the type erasure on the eval 
method without defining explicit user-defined function classes for each type? 
Thanks for your help! 
Morrisa 


Code snippet: 

package flink_generics_testing; 
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.functions.ScalarFunction; 
/**
* Reads custom values from a table and performs a function on those values.
* T should be able to be a String, long, float, boolean, or Date
*
* @param  The expected type of the table column values.
*/
public class CustomScalarFunction extends ScalarFunction { 
 private static final long serialVersionUID = -5537657771138360838L; 
   private final Class desiredType; 
 /**
* Construct an instance.
*
* @param desiredType The type of the value that we're performing the 
function on.
*/
 public CustomScalarFunction(Class desiredType) {
 this.desiredType = desiredType;
 }
 
 public T eval(T value) {
 return value;
 } 
 @Override
 public TypeInformation getResultType(Class[] signature) {
 return TypeInformation.of(desiredType);
 } 
 @Override
 public TypeInformation[] getParameterTypes(Class[] signature) {
 return new TypeInformation[]{
   TypeInformation.of(desiredType)
   };
 }
} 

 -- 



Morrisa Brenner
Software Engineer 



 225 Franklin St, Boston, MA 02110
klaviyo.com 
[Klaviyo
  Logo] 



Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

2019-04-19 Thread JingsongLee
Hi @Oytun Tez
It Looks like your PatternFlatSelectFunction is not serializable.
Because you use anonymous inner class, Check the class to which getPending 
belongs, maybe that class is not serializable?
Or you may be advised not to use internal classes, but to use a static internal 
class.

Best, JingsongLee


--
From:Oytun Tez 
Send Time:2019年4月19日(星期五) 03:38
To:user 
Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Hi all,

We are just migration from 1.6 to 1.8. I encountered a serialization error 
which we didn't have before if memory serves: The implementation of the 
PatternFlatSelectAdapter is not serializable. The object probably contains or 
references non serializable fields.

The method below simply intakes a PatternStream from CEP.pattern() and makes 
use of the sideoutput for timed-out events. Can you see anything weird here 
(WorkerEvent is the input, but collectors collect Project object)?

protected DataStream getPending(PatternStream 
patternStream) {
OutputTag pendingProjectsTag = new 
OutputTag("invitation-pending-projects"){};

return patternStream.flatSelect(
pendingProjectsTag,
new PatternFlatTimeoutFunction() {
@Override
public void timeout(Map> map, 
long l, Collector collector) {
}
},
new PatternFlatSelectFunction() {
@Override
public void flatSelect(Map> 
pattern, Collector collector) {
}
}
).name("Select pending projects for 
invitation").getSideOutput(pendingProjectsTag);
}

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com 

Re: Is it possible to handle late data when using table API?

2019-04-16 Thread JingsongLee
To set rowtime watermarks delay of source you can:
val desc = Schema()
  .field("a", Types.INT)
  .field("e", Types.LONG)
  .field("f", Types.STRING)
  .field("t", Types.SQL_TIMESTAMP)  
.rowtime(Rowtime().timestampsFromField("t").watermarksPeriodicBounded(1000))
Use watermarksPeriodicBounded api to set delay.
And then XXTableFactory.createStreamTableSource(desc.toProperties)


--
From:JingsongLee 
Send Time:2019年4月16日(星期二) 17:09
To:Lasse Nedergaard ; user 
Subject:回复:Is it possible to handle late data when using table API?

Hi @Lasse Nedergaard, Table API don't have allowedLateness api.
But you can set rowtime.watermarks.delay of source to slow down the watermark 
clock.


--
发件人:Lasse Nedergaard 
发送时间:2019年4月16日(星期二) 16:20
收件人:user 
主 题:Is it possible to handle late data when using table API?

Hi.

I have a simple tumble window working on eventtime like this.

Table res30MinWindows = machineInsights
.window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // 
define window
.groupBy("machineId, machineInsightId, w") // group by key and window
.select("machineId, machineInsightId, w.start, w.end, w.rowtime, 
value.max as max"); // access window properties and aggregate
As we work with Iot units we don't have 100% control over the eventtime 
reported and therefore need to handle late data to ensure that we don't do our 
calculation wrong.
I would like to know if there is any option in the Table API to get access to 
late data, or my only option is to use Streaming API?
Thanks in advance
Lasse Nedergaard




回复:Is it possible to handle late data when using table API?

2019-04-16 Thread JingsongLee
Hi @Lasse Nedergaard, Table API don't have allowedLateness api.
But you can set rowtime.watermarks.delay of source to slow down the watermark 
clock.


--
发件人:Lasse Nedergaard 
发送时间:2019年4月16日(星期二) 16:20
收件人:user 
主 题:Is it possible to handle late data when using table API?

Hi.

I have a simple tumble window working on eventtime like this.

Table res30MinWindows = machineInsights
.window(Tumble.over("30.minutes").on("UserActionTime").as("w")) // 
define window
.groupBy("machineId, machineInsightId, w") // group by key and window
.select("machineId, machineInsightId, w.start, w.end, w.rowtime, 
value.max as max"); // access window properties and aggregate
As we work with Iot units we don't have 100% control over the eventtime 
reported and therefore need to handle late data to ensure that we don't do our 
calculation wrong.
I would like to know if there is any option in the Table API to get access to 
late data, or my only option is to use Streaming API?
Thanks in advance
Lasse Nedergaard