Re: flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?

2020-07-01 文章
我们正准备开发这个功能,详情可以参考:https://issues.apache.org/jira/browse/FLINK-15221

夏帅  于2020年7月1日周三 下午3:13写道:

> 你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once
>
> Kafka011TableSink
>
>
> @Override
> protected SinkFunction createKafkaProducer(
>   String topic,
>   Properties properties,
>   SerializationSchema serializationSchema,
>   Optional> partitioner) {
>return new FlinkKafkaProducer011<>(
>   topic,
>   new KeyedSerializationSchemaWrapper<>(serializationSchema),
>   properties,
>   partitioner,
>   FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
>   5);
> }
> 如果想要修改配置的话,具体可以参考KafkaTableSourceSinkFactoryBase
>
> 参考:
> https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
> --
> 发件人:静谧雨寒 
> 发送时间:2020年7月1日(星期三) 14:33
> 收件人:user-zh 
> 主 题:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once?
>
>  flink sql CREATE TABLE kafka sink表,开启checkpoint后,如何配置sql
> sink表使用两阶事务提交,exactly-once一致性保证 ?
> 官档说法:
> Consistency guarantees: By default, a Kafka sink ingests data with
> at-least-once guarantees into a Kafka topic if the query is executed with
> checkpointing enabled.,  
> CREATE TABLE 默认是 at-least-once
>
>


Re: kafka相关问题

2020-06-10 文章
那你有没有尝试过修改connector中property中connector.startup-mode
设置为latest-offset,这样子每次从kafka读取都是读取最新的消息。
另外,我想问一下 你的sql是一直运行的吗?
我给的limit方案是一个upersert流。

小学生 <201782...@qq.com> 于2020年6月10日周三 下午5:31写道:

> limit 没有用呀。有没有切实可行的方案呢,pyflink下。


Re: kafka相关问题

2020-06-10 文章
那你可以调整sql语句,例如使用limit关键字, topN 或使用自定义的方法,具体的你可以根据你的sql语句不断调整.
如有错误欢迎指正

小学生 <201782...@qq.com> 于2020年6月10日周三 下午3:26写道:

> 您好,我是通过select * from
> table_ddl这个去触发的,但是就是因为table_ddl的不断增大,脱离我的业务需要,我业务需要的就是触发select * from
> table_ddl这个时候,只取最新的一条数据(就是保证table_ddl表总只有一条数据)


Re: kafka相关问题

2020-06-10 文章
我认为创建一张表并不会触发从source读取的操作,除非你使用select * from table_ddl这类的操作。
至于表的增大,显然是的。 我觉得你可能关注的是否会flush数据进入存储系统,保证不会out-of-memory这类的问题吗?
我个人猜可能有两种方案:
1.考虑到kafka是一种已经实现持久化的数据源,因此我们并非需要将所有的数据缓存到保存到本地,可以只记录读取到的数据位置,下次读取从记录的位置开始;
2.定期向文件系统写入数据。


小学生 <201782...@qq.com> 于2020年6月10日周三 下午2:48写道:

> 各位大佬好,请教一个问题:
> 利用kafka作为流式源,注册内部的flink的动态表,如下面的table_ddl ,是否由于'update-mode' =
> 'append',所以随着kafka消息逐步的推送,会导致table_ddl这个表的数据不断在增大,但是我想要的是kafka每推送一条数据,table_ddl就会相应的是这一条数据,请问这个如何实现呢?(或者有没有啥办法,可以直接从table_ddl中取最新的kafka数据呢)。
>
>
> table_ddl = """
> CREATE TABLE table_ddl (
>  trck_id VARCHAR
> ) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = 'universal',    
>  'connector.topic' = 'w',    
>  'connector.startup-mode' = 'group-offsets',
>  'connector.properties.group.id' = 'trck_w', 
>  'update-mode' = 'append',
>  'connector.properties.zookeeper.connect' = '*',
>  'connector.properties.bootstrap.servers' = '%#',
>  'format.type' = 'json',     
>  'format.derive-schema' = 'true' 
> )
> """


Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

2020-06-09 文章
我又仔细读了文档和代码,显然org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint中是通过operator
ID进行匹配,也就是说,只有在新作业的operator id和savepoint中的id相匹配的情况下,才允许加载成功。 而operator
id的生成有两种方法:1.用户制定;2. 自动生成。 显然你的作业中没有指定相应的id,
因此flink会为你自动生成相应的id,但是operator的id生成方法对结构非常敏感,显然由于作业的修改导致了新旧两个作业的生成operator
id不一致。具体的可以参考文档 我提到的文档中Assiging Operator IDs这一节的内容。
注意: sql语句并不能和operator画上等号,也就是说一个sql语句并不代表一个operator。

方盛凯  于2020年6月9日周二 下午9:26写道:

>
> 可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
> 对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html
>
> 如有错误,欢迎补充回答。
>
> 陈赋赟  于2020年6月8日周一 上午11:53写道:
>
>> 原先sql任务是:
>> CREATE TABLE A_source(...)
>> CREATE TABLE B_sink (...)
>> INSERT INTO B_sink
>> SELECT
>>  1
>> FROM
>> A_source
>> ;
>> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
>>
>>
>> CREATE TABLE A_source(...)
>> CREATE TABLE B_sink (...)
>> CREATE TABLE C_source(...)
>> CREATE TABLE D_sink (...)
>> INSERT INTO B_sink
>> SELECT
>>  1
>> FROM
>> A_source
>> ;
>>
>>
>> INSERT INTO C_sink
>> SELECT
>>  1
>> FROM
>> D_source
>> ;
>> 并基于Savepoint提交,结果显示
>>
>> Cannot map checkpoint/savepoint state for operator
>> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
>> is not available in the new program.
>> If you want to allow to skip this, you can set the
>> --allowNonRestoredState option on the CLI.
>>
>>
>> 想请教一下底层是因为什么原因导致了opertor匹配不上?
>
>


Re: 关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

2020-06-09 文章
可以查看org.apache.flink.runtime.checkpoint#loadAndValidateCheckpoint方法了解为什么加载失败了
对于这个问题,文档和代码中都提供了解决办法,就是使用 --allowNonRestoredState参数,具体使用方法参见文档
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html

如有错误,欢迎补充回答。

陈赋赟  于2020年6月8日周一 上午11:53写道:

> 原先sql任务是:
> CREATE TABLE A_source(...)
> CREATE TABLE B_sink (...)
> INSERT INTO B_sink
> SELECT
>  1
> FROM
> A_source
> ;
> 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为
>
>
> CREATE TABLE A_source(...)
> CREATE TABLE B_sink (...)
> CREATE TABLE C_source(...)
> CREATE TABLE D_sink (...)
> INSERT INTO B_sink
> SELECT
>  1
> FROM
> A_source
> ;
>
>
> INSERT INTO C_sink
> SELECT
>  1
> FROM
> D_source
> ;
> 并基于Savepoint提交,结果显示
>
> Cannot map checkpoint/savepoint state for operator
> 2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator
> is not available in the new program.
> If you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.
>
>
> 想请教一下底层是因为什么原因导致了opertor匹配不上?