Re:回复: flink sql消费kafka sink到mysql问题
发现是flink sql 消费kafka 不管有没有解析成功。先去提交offset到kafka 但是实际 是解析失败了。 在 2021-01-06 14:01:34,"Evan" 写道: >flinksql 貌似是目前做不到你说的这样 > > > > >发件人: air23 >发送时间: 2021-01-06 12:29 >收件人: user-zh >主题: flink sql消费kafka sink到mysql问题 >你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了 >然后再重启 发现报错的数据 会丢失 >采用的scan.startup.mode' = 'group-offsets' >按理说 不是要重新消费 失败的那条数据 开始消费吗? >请问如何配置 可以不丢失数据 > > >CREATE TABLE source1 ( >id BIGINT , >username STRING , >password STRING , >AddTime TIMESTAMP , >origin_table STRING METADATA FROM 'value.table' VIRTUAL, >origin_sql_type MAP METADATA FROM 'value.sql-type' VIRTUAL >) WITH ( >'connector' = 'kafka', >'topic' = 'plink_canal', >'properties.bootstrap.servers' = '***', >'properties.group.id' = 'canal1', >'scan.startup.mode' = 'group-offsets', >'canal-json.table.include' = 'test.*', >-- 'canal-json.ignore-parse-errors' = 'true', -- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 >false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null >'format' = 'canal-json' >);
回复: flink sql消费kafka sink到mysql问题
flinksql 貌似是目前做不到你说的这样 发件人: air23 发送时间: 2021-01-06 12:29 收件人: user-zh 主题: flink sql消费kafka sink到mysql问题 你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了 然后再重启 发现报错的数据 会丢失 采用的scan.startup.mode' = 'group-offsets' 按理说 不是要重新消费 失败的那条数据 开始消费吗? 请问如何配置 可以不丢失数据 CREATE TABLE source1 ( id BIGINT , username STRING , password STRING , AddTime TIMESTAMP , origin_table STRING METADATA FROM 'value.table' VIRTUAL, origin_sql_type MAP METADATA FROM 'value.sql-type' VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'plink_canal', 'properties.bootstrap.servers' = '***', 'properties.group.id' = 'canal1', 'scan.startup.mode' = 'group-offsets', 'canal-json.table.include' = 'test.*', -- 'canal-json.ignore-parse-errors' = 'true', -- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null 'format' = 'canal-json' );
flink sql消费kafka sink到mysql问题
你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了 然后再重启 发现报错的数据 会丢失 采用的scan.startup.mode' = 'group-offsets' 按理说 不是要重新消费 失败的那条数据 开始消费吗? 请问如何配置 可以不丢失数据 CREATE TABLE source1 ( id BIGINT , username STRING , password STRING , AddTime TIMESTAMP , origin_table STRING METADATA FROM 'value.table' VIRTUAL, origin_sql_type MAP METADATA FROM 'value.sql-type' VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'plink_canal', 'properties.bootstrap.servers' = '***', 'properties.group.id' = 'canal1', 'scan.startup.mode' = 'group-offsets', 'canal-json.table.include' = 'test.*', -- 'canal-json.ignore-parse-errors' = 'true', -- 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null 'format' = 'canal-json' );