回复: 如何查看1.10的中文文档

2021-10-10 Thread wukon...@foxmail.com
hi 杨浩:
  中文版地址 https://ci.apache.org/projects/flink/flink-docs-release-1.10/
  可以看到最左侧 下面有个 Pick Docs Version 可以通过这里选择所有版本的文档



wukon...@foxmail.com
 
发件人: 杨浩
发送时间: 2021-10-09 10:57
收件人: user-zh
主题: 如何查看1.10的中文文档
我们公司用的flink版本是release-1.10,请问如何查看该版本的中文文档,
 
 
英文文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/
中文只能看最新的:https://flink.apache.org/zh/flink-architecture.html
 


????: flink 1.13.2 ????avg??????int????????????????????????int??????????????????

2021-09-29 Thread wukon...@foxmail.com
Hi Asahi :
    ??int ??cast( xx as Decimal)  



wukon...@foxmail.com
 
 Asahi Lee
?? 2021-09-27 14:29
 user-zh
?? flink 1.13.2 
avg??intint??
hi!   ??flink 1.13.2?? int 
??avg?? int  
double??decimal??bug 


????: ?????? flink sql????????????????sink table?

2021-09-25 Thread wukon...@foxmail.com
hi : 
UDF ??SQL ?? 
?? topic ??
?? https://mp.weixin.qq.com/s/IKzCRTh8eamk6TX7_RHYdQ



wukon...@foxmail.com
 
 JasonLee
?? 2021-09-23 21:56
 user-zh@flink.apache.org
?? ?? flink sqlsink table?
Hi
 
 
,  SQL  SQL 
??,??
 
 
Best
JasonLee
 
 
??2021??09??23?? 09:28 ??
sql??sql??
 
iPhone
 
 
--  --
??: 2572805166 <2572805...@qq.com.INVALID
: 2021??9??23?? 09:23
??: user-zh https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like
 casel.chen ??2021??9??18?? 8:27?? kafka 
topic??topic)
 ??flink sqlsink table


????: flink cdc SQL2ES??GC overhead limit exceeded

2021-09-15 Thread wukon...@foxmail.com
hi LIYUAN:
 ??flink  




wukon...@foxmail.com
 
 LI YUAN
?? 2021-09-09 20:38
 user-zh
?? flink cdc SQL2ES??GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.rocksdb.RocksIterator.key0(Native Method)
at org.rocksdb.RocksIterator.key(RocksIterator.java:37)
at 
org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.key(RocksIteratorWrapper.java:99)
at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.loadCache(RocksDBMapState.java:670)
at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.hasNext(RocksDBMapState.java:585)
at 
org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey$1.hasNext(OuterJoinRecordStateViews.java:285)
at 
org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:199)
at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:211)
at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:129)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:190)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$$Lambda$363/366743523.accept(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$208/999379751.runDefaultAction(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$625/601779266.run(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
 
Environment :
 
Flink version : 1.13.1
 
Flink CDC version: 1.4.0
 
Database and version: Mysql 7.0


回复: Re: Temporal Joins 报 Currently the join key in Temporal Table Join can not be empty

2021-09-15 Thread wukon...@foxmail.com
Hi Wayne :
   请查考Flink Join 官方文档, 右侧构建侧 需要定义  PRIMARY KEY 同时 在join 条件的时候带上。
参考地址: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/joins.html#%E5%9F%BA%E4%BA%8E%E4%BA%8B%E4%BB%B6%E6%97%B6%E9%97%B4%E7%9A%84%E6%97%B6%E6%80%81-join
 

内容: 
注意: 基于事件时间的时态表 Join 是通过左右两侧的 watermark 触发,请确保为 join 两侧的表设置了合适的 watermark。
注意: 基于事件时间的时态表 Join 的 join key 必须包含时态表的主键,例如:表 product_changelog 的主键 
P.product_id 必须包含在 join 条件 O.product_id = P.product_id 中。



wukon...@foxmail.com
 
发件人: Wayne
发送时间: 2021-09-09 23:29
收件人: user-zh
主题: Re:Re:Re: Re: Temporal Joins 报 Currently the join key in Temporal Table 
Join can not be empty
Hello 
 
 
打扰了我最近再次尝试,还是报这个错误
我的flink版本为 flink-1.12.2-bin-scala_2.12
使用sql client执行
我的sql 如下  
 
 
CREATE TABLE stub_trans (
`uuid` STRING,
`columnInfos` MAP NOT 
NULL>  NOT NULL,
procTime TIMESTAMP(3) METADATA FROM 'timestamp' , 
WATERMARK FOR procTime AS procTime   
) WITH (
'connector' = 'kafka',
..
'format' = 'avro'
);
 
 
CREATE TABLE kapeng_test (
 
 
`event_id` STRING,
`genre_id` STRING,
`user_guid` STRING,
procTime TIMESTAMP(3) METADATA FROM 'timestamp'  ,  
WATERMARK FOR procTime AS procTime   
) WITH (
'connector' = 'kafka',

'format' = 'avro'
);
 
 
 
 
CREATE TABLE purchase_hist (
rowkey STRING,
d ROW < cost STRING, crdate STRING, currency STRING, eventid STRING, 
genreid STRING, quantity STRING >,
PRIMARY KEY ( rowkey ) NOT ENFORCED 
) WITH ( 
'connector' = 'hbase-1.4', 
.);
 
 
 
 
INSERT INTO purchase_hist SELECT
rowkey,
ROW ( cost, crdate, currency, eventid, genreid, quantity ) 
FROM
( SELECT
CONCAT_WS(
'_',
user_guid,
CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, 
'-MM-dd:HH:mm:ss.S' ) AS BIGINT ) AS STRING )) AS rowkey,
columnInfos [ 'TICKET_COST' ].newValue AS cost,
DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, 
'-MM-dd:HH:mm:ss.S' ), '-MM-dd' ) AS crdate,
columnInfos [ 'EVENT_ID' ].newValue AS eventid,
columnInfos [ 'CURRENCY_CODE' ].newValue AS currency,
columnInfos [ 'QUANTITY' ].newValue AS quantity,
genre_id AS genreid,
user_guid AS userGuid 
FROM
stub_trans
   LEFT  JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON 
stub_trans.columnInfos [ 'EVENT_ID' ].newValue = kapeng_test.event_id )m
 
 
 
 
报错如下
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Currently the join key in Temporal Table Join can not be empty.
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.immutable.Range.foreach(Range.scala:158)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(Traver

Re: hbase 列设置TTL过期后,flink不能再写入数据

2021-09-15 Thread wukon...@foxmail.com
hi :
 你是如何定义的source 表 和sink 表的吗? 能贴出来具体的建表语句吗? 



wukon...@foxmail.com
 
发件人: xiaohui zhang
发送时间: 2021-09-09 17:19
收件人: user-zh
主题: hbase 列设置TTL过期后,flink不能再写入数据
Flink:1.12.1
Flink-connector: 2.2
Hbase: 2.1.0 + CDH6.3.2
现象:如果hbase列族设置了TTL,当某一rowkey写入数据,到达过期时间,列族会被hbase标记为删除。
后续如果有相同key的数据过来,flink无法将数据写入到hbase中,查询hbase中列族一直为空。
 
执行的过程大致如下:
创建Hbase表,test, 两个列族 cf1 , TTL 60, cf2, TTL 120,
数据TTL分别为1分钟,2分钟。
使用sql写入数据至表中
 
insert into test
select
'rowkey',
ROW('123'),
ROW('456')
from
   sometable;
 
过一分钟后,通过hbase 查询,可发现无cf1数据,两分钟后该rowkey无对应数据。
此时再通过flink写入数据,发现无法写入,且flink不报错
 
请问这个情况是Bug,还是Hbase的问题呢?


回复: apache-flink

2021-09-06 Thread wukon...@foxmail.com
Hi 
  邮件组里没有看到你发的附件 和截图  可以尝试引用外部文档链接方式 来描述问题
 
发件人: 杜畅
发送时间: 2021-09-06 10:43
收件人: user-zh
主题: apache-flink
 flink 1.13.2 流作业消费kafka消息 实时存储到hive中,作业平台发现job第三步没有消息sent 
从网上搜索均无此解决方案,代码也是从官网仿照的。
望帮看下是什么原因

截图和代码请见附件


 


回复: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

2021-08-30 Thread wukon...@foxmail.com
我目前大概会采用 Shuo Cheng 提到的, 使用先sink 到Mysql, 再启一个任务 cdc mysql 中的表,这样能保证插入成功后的数据。

我目前使用的是flink 1.12 版本 如果是多端sink 比如 sink db 同时sink kafka ,flink 在sink db 
失败,依旧会sink kafka 但是会因为异常,发生tm 重启,会根据自定义重启策略,一直到最后 整个job fail over 掉。



wukon...@foxmail.com
 
发件人: 东东
发送时间: 2021-08-30 16:50
收件人: user-zh
主题: Re:Re: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
 
 
 
对于Exactly-Once模式下的checkpoint,如果sink端支持两段式事务,应该就可以做到一个sink失败,整个checkpoint失败的。
 
 
不过JDBC sink支持Exactly-Once是1.13.0以后的事情了,建议检查一下你的版本和配置
 
 
 
 
 
 
 
 
 
在 2021-08-30 16:27:24,"wukon...@foxmail.com"  写道:
>Hi: 
> 我理解这种方式, 目前我只是想让先插入数据到Mysql 然后再通过通知到下游,来查询Mysql 进行数据etl 不知道大家如何通过SQL来实现这一逻辑
>
>
>
>wukon...@foxmail.com
> 
>发件人: Shuo Cheng
>发送时间: 2021-08-30 10:19
>收件人: user-zh
>主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>你好, 你说的这种控制写入的方式在同一个 Flink SQL job 里是无法实现的. 控制数据是否写入某个
>Sink,可以看看是否从逻辑上能在 Sink 前加一个 Filter,从而达到过滤目的;如果 kafka sink 跟 MySQL
>表是一种类似级联的关系, 可以考虑先写入 MySQL, 然后另起一个 Job 用 CDC 方式读 MySQL changelog 再写入
>Kafka sink.
> 
>On 8/26/21, jie han  wrote:
>> HI:
>> 可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
>>
>> 悟空  于2021年8月26日周四 下午1:54写道:
>>
>>> 我目前用的是flink-connector-kafka_2.11和flink-connector-jdbc_2.11,
>>> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
>>> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。
>>> 但是接着sink Kafka 是成功的,Kafka端 我开启了'sink.semantic' = 'exactly-once',
>>> 同时下游consumer 使用--isolation-level read_committed
>>> 读取,依旧能成功读取到数据,说明sink
>>> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>>>
>>>
>>>
>>>
>>> --原始邮件--
>>> 发件人:
>>>   "user-zh"
>>> <
>>> tsreape...@gmail.com;
>>> 发送时间:2021年8月26日(星期四) 中午1:25
>>> 收件人:"user-zh">>
>>> 主题:Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>>
>>>
>>>
>>> Hi!
>>>
>>> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入
>>> db
>>> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>>>
>>> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
>>> Flink CDC connector[1]
>>>
>>> [1] https://github.com/ververica/flink-cdc-connectors
>>>
>>> 悟空 >>
>>>  能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>>>  加入的,然后执行execute()方法
>>> 
>>> 
>>> 
>>> 
>>>  --nbsp;原始邮件nbsp;--
>>>  发件人:
>>> 
>>> "user-zh"
>>> 
>>> <
>>>  fskm...@gmail.comgt;;
>>>  发送时间:nbsp;2021年8月26日(星期四) 中午12:36
>>>  收件人:nbsp;"user-zh">> 
>>>  主题:nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>> 
>>> 
>>> 
>>>  说的是 statement set [1] 吗 ?
>>> 
>>>  [1]
>>> 
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
>>> 
>>> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements>
>>> ;
>>>  悟空 >> 
>>>  gt; hi all:amp;nbsp;
>>>  gt; amp;nbsp; amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
>>> 目前遇到一个问题, 我现在想实现
>>>  在一个事务里 先将kafka
>>>  gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>>>  gt; amp;nbsp; amp;nbsp;语句类似这种:
>>>  gt; amp;nbsp; amp;nbsp;insert into
>>> db_table_sinkamp;nbsp;select *
>>>  fromamp;nbsp;
>>>  gt; kafka_source_table;
>>>  gt; amp;nbsp; amp;nbsp;insert into kafka_table_sink
>>> select * from
>>>  kafka_source_table;
>>>  gt;
>>>  gt;
>>>  gt; amp;nbsp; 请问flink SQL 有实现方式吗?
>>> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>>>  程序没有挂掉。
>>


Re: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

2021-08-30 Thread wukon...@foxmail.com
Hi: 
 我理解这种方式, 目前我只是想让先插入数据到Mysql 然后再通过通知到下游,来查询Mysql 进行数据etl 不知道大家如何通过SQL来实现这一逻辑



wukon...@foxmail.com
 
发件人: Shuo Cheng
发送时间: 2021-08-30 10:19
收件人: user-zh
主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
你好, 你说的这种控制写入的方式在同一个 Flink SQL job 里是无法实现的. 控制数据是否写入某个
Sink,可以看看是否从逻辑上能在 Sink 前加一个 Filter,从而达到过滤目的;如果 kafka sink 跟 MySQL
表是一种类似级联的关系, 可以考虑先写入 MySQL, 然后另起一个 Job 用 CDC 方式读 MySQL changelog 再写入
Kafka sink.
 
On 8/26/21, jie han  wrote:
> HI:
> 可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
>
> 悟空  于2021年8月26日周四 下午1:54写道:
>
>> 我目前用的是flink-connector-kafka_2.11和flink-connector-jdbc_2.11,
>> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
>> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。
>> 但是接着sink Kafka 是成功的,Kafka端 我开启了'sink.semantic' = 'exactly-once',
>> 同时下游consumer 使用--isolation-level read_committed
>> 读取,依旧能成功读取到数据,说明sink
>> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>>
>>
>>
>>
>> --原始邮件--
>> 发件人:
>>   "user-zh"
>> <
>> tsreape...@gmail.com;
>> 发送时间:2021年8月26日(星期四) 中午1:25
>> 收件人:"user-zh">
>> 主题:Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>>
>>
>>
>> Hi!
>>
>> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入
>> db
>> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>>
>> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
>> Flink CDC connector[1]
>>
>> [1] https://github.com/ververica/flink-cdc-connectors
>>
>> 悟空 >
>>  能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>>  加入的,然后执行execute()方法
>> 
>> 
>> 
>> 
>>  --nbsp;原始邮件nbsp;--
>>  发件人:
>> 
>> "user-zh"
>> 
>> <
>>  fskm...@gmail.comgt;;
>>  发送时间:nbsp;2021年8月26日(星期四) 中午12:36
>>  收件人:nbsp;"user-zh"> 
>>  主题:nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>> 
>> 
>> 
>>  说的是 statement set [1] 吗 ?
>> 
>>  [1]
>> 
>> 
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
>> 
>> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements>
>> ;
>>  悟空 > 
>>  gt; hi all:amp;nbsp;
>>  gt; amp;nbsp; amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
>> 目前遇到一个问题, 我现在想实现
>>  在一个事务里 先将kafka
>>  gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>>  gt; amp;nbsp; amp;nbsp;语句类似这种:
>>  gt; amp;nbsp; amp;nbsp;insert into
>> db_table_sinkamp;nbsp;select *
>>  fromamp;nbsp;
>>  gt; kafka_source_table;
>>  gt; amp;nbsp; amp;nbsp;insert into kafka_table_sink
>> select * from
>>  kafka_source_table;
>>  gt;
>>  gt;
>>  gt; amp;nbsp; 请问flink SQL 有实现方式吗?
>> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>>  程序没有挂掉。
>


Re: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中

2021-08-30 Thread wukon...@foxmail.com
能具体说下如何实现吗? 我用cdc 能实现什么,我现在想让两个Insert Sql 保持到一个事务里, 要么全成功,要么全失败,目前查看Flink 文档 
并没有发现相关的解释



wukon...@foxmail.com
 
发件人: jie han
发送时间: 2021-08-26 21:36
收件人: user-zh
主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
HI:
可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀
 
悟空  于2021年8月26日周四 下午1:54写道:
 
> 我目前用的是flink-connector-kafka_2.11和flink-connector-jdbc_2.11,
> 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段,
> 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。
> 但是接着sink Kafka 是成功的,Kafka端 我开启了'sink.semantic' = 'exactly-once',
> 同时下游consumer 使用--isolation-level read_committed 读取,依旧能成功读取到数据,说明sink
> db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> tsreape...@gmail.com;
> 发送时间:2021年8月26日(星期四) 中午1:25
> 收件人:"user-zh"
> 主题:Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
>
>
>
> Hi!
>
> 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入 db
> 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象?
>
> 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下
> Flink CDC connector[1]
>
> [1] https://github.com/ververica/flink-cdc-connectors
>
> 悟空 
>  能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql
>  加入的,然后执行execute()方法
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  fskm...@gmail.comgt;;
>  发送时间:nbsp;2021年8月26日(星期四) 中午12:36
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中
> 
> 
> 
>  说的是 statement set [1] 吗 ?
> 
>  [1]
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements
> 
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements>
> ;
>  悟空  
>  gt; hi all:amp;nbsp;
>  gt; amp;nbsp; amp;nbsp; 我目前基于flink 1.12 sql 来开发功能,
> 目前遇到一个问题, 我现在想实现
>  在一个事务里 先将kafka
>  gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。
>  gt; amp;nbsp; amp;nbsp;语句类似这种:
>  gt; amp;nbsp; amp;nbsp;insert into
> db_table_sinkamp;nbsp;select *
>  fromamp;nbsp;
>  gt; kafka_source_table;
>  gt; amp;nbsp; amp;nbsp;insert into kafka_table_sink
> select * from
>  kafka_source_table;
>  gt;
>  gt;
>  gt; amp;nbsp; 请问flink SQL 有实现方式吗?
> 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink
>  程序没有挂掉。