Re:回复:flink 从mysql读取数据异常

2021-03-29 文章 air23
这边是想离线读取。不是走实时的 看到异常是 Only insert statement is supported now 在 2021-03-30 10:31:51,"guoyb" <861277...@qq.com> 写道: >可以读取的,还有内置flink cdc >select得用query方法,看看是不是用错了execute。 > > > >---原始邮件--- >发件人: "air23"发送时间: 2021年3月30日(周二) 上午10:25 >

flink 从mysql读取数据异常

2021-03-29 文章 air23
你好 参考官网 https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html 这边读取mysql jdbc数据报错Exception in thread "main" org.apache.flink.table.api.TableException: Only insert statement is supported now. String a = "-- register a MySQL table 'users' in Flink SQL\n"

flink 从mysql读取数据异常

2021-03-29 文章 air23
你好 参考官网 https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html 这边读取mysql jdbc数据报错Exception in thread "main" org.apache.flink.table.api.TableException: Only insert statement is supported now. String a = "-- register a MySQL table 'users' in Flink SQL\n"

Re:Re: 回复:flink怎么读kafka offset

2021-01-18 文章 air23
是可以这样恢复。 但是如果使用setStartFromGroupOffsets() 如果中间程序挂了, 会导致消费者已经提交了offset,但是下游数据没有处理完 ,或者没有sink到下游 下次重启会从kakfa的偏移量开始消费 没有处理的数据,会丢失 在 2021-01-18 10:50:44,"hoose" <307840...@qq.com> 写道: >我理解的是这样,虽然不是从savepoint里恢复,但kafka >consumer_topic有了我们之前保存的groupid,那么默认的是flink是这样设置: >

Re:Re:flinksql 消费kafka offset问题

2021-01-14 文章 air23
我的意思 是不使用checkpoint。 使用'scan.startup.mode' = 'group-offsets' 去维护offset 在 2021-01-15 11:35:16,"Michael Ran" 写道: >下游sink还没有完成, offset 不是在checkpoint 里面的吗?下次启动会从你ck的位置恢复才对。除非你sink >是异步操作,告诉上游你sink 完成了,实际你sink失败了 >在 2021-01-15 10:29:15,"air23" 写道: &g

flinksql 消费kafka offset问题

2021-01-14 文章 air23
flink消费kafka 只能使用checkpoint去维护offset吗 我这边使用'scan.startup.mode' = 'group-offsets' 如果中间报错了 或者停止任务,但是我下游sink还没有完成, 下次启动直接跳过这个报错的数据,会丢数据,谢谢回复

Re:Sql Client读取Kafka报错:Could not find any factory for identifier 'kafka'

2021-01-10 文章 air23
下载个 flink-sql-connector-kafka 这个jar 放在lib下试下 在 2021-01-09 02:08:12,"inza9hi" 写道: >搜了下之前的邮件,貌似没有发现和我同样的问题。 > >lib 下的Jar >flink-csv-1.11.3.jar >flink-table-blink_2.11-1.11.3.jar >flink-dist_2.11-1.11.3.jar

Re:回复: flink sql消费kafka sink到mysql问题

2021-01-05 文章 air23
发现是flink sql 消费kafka 不管有没有解析成功。先去提交offset到kafka 但是实际 是解析失败了。 在 2021-01-06 14:01:34,"Evan" 写道: >flinksql 貌似是目前做不到你说的这样 > > > > >发件人: air23 >发送时间: 2021-01-06 12:29 >收件人: user-zh >主题: flink sql消费kafka sink到mysql问题 >你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了

flink sql消费kafka sink到mysql问题

2021-01-05 文章 air23
你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了 然后再重启 发现报错的数据 会丢失 采用的scan.startup.mode' = 'group-offsets' 按理说 不是要重新消费 失败的那条数据 开始消费吗? 请问如何配置 可以不丢失数据 CREATE TABLE source1 ( id BIGINT , username STRING , password STRING , AddTime TIMESTAMP , origin_table STRING METADATA FROM

flink timestamp 解析问题

2021-01-05 文章 air23
你好 这边使用flink sql有如下问题; CREATE TABLE source1 ( id BIGINT , username STRING , password STRING , AddTime TIMESTAMP , origin_table STRING METADATA FROM 'value.table' VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'plink_canal', 'properties.bootstrap.servers' = '',

canal-json format优化问题

2020-12-30 文章 air23
在官方文档上 看到 canal-json format 有如下两个key canal-json.database.include canal-json.table.include 这2个key 看源码是equals等于关系,而不是include包含关系 ,是否这2个字段可以配置正则表达式来包含配置的表 来支持呢,如 "'canal-json.table.include' = 'test*',"+ 因为可能会用到分库分表的flink场景。 谢谢

Re:Re: canal-json 分库分表场景应用

2020-11-26 文章 air23
是的 感觉国内canal用户应该比debezium用户多很多,谢谢支持 在 2020-11-27 11:40:55,"Leonard Xu" 写道: >Hi, air23 > >国内用户用Canal-json还是很多的,我建了个issue 来支持, 你可以关注下。 > >> 你好 请问这个debezium-json 这个value.source.table 功能 在1.12的canal-json会实现吗, >> 看到canal-json代码里面 好像是有这部

Re:Re: canal-json 分库分表场景应用

2020-11-26 文章 air23
bezium/DebeziumJsonDecodingFormat.java#L136 > >On Fri, 27 Nov 2020 at 09:49, air23 wrote: > >> 你好 我这边有很多这种场景,把分库分表的binlog 写入到同一个topic 然后用canal-json解析,这边想获取到json里面的table >> 字段, >> >> 然后根据 table名称加主键 写入到下游 合成一张表,写入到下游表, >> >> 然后发现 canal-json 是获取不到表名的,然后这边去修改c

canal-json 分库分表场景应用

2020-11-26 文章 air23
你好 我这边有很多这种场景,把分库分表的binlog 写入到同一个topic 然后用canal-json解析,这边想获取到json里面的table 字段, 然后根据 table名称加主键 写入到下游 合成一张表,写入到下游表, 然后发现 canal-json 是获取不到表名的,然后这边去修改canal-json的format。 添加 createJsonRowType方法的DataTypes.FIELD("table", DataTypes.STRING()) 然后在deserialize方法里面把 table字段set到 data里面去。但是发现这种好像是不成功的 ,请问下

Re:Re: flink on yarn 任务FAILED后 没有错误日志 输出到yarn log

2020-11-20 文章 air23
但是 在yarn上跑的spark 任务 都是可以看到错误日志的, flink这边配置的是log4j的日志文件,本地运行 控制台是可以看到错误原因 和日志 在 2020-11-20 17:53:03,"caozhen" 写道: > >1、jobmanager的日志有没有错误呢? >2、或者通过yarn history查下日志 yarn logs -applicationId xxx >3、如果是flink client 提交作业可以看下客户端日志 > > > >air23 wrote >> 你好 &

Re:Re: flink on yarn 任务FAILED后 没有错误日志 输出到yarn log

2020-11-20 文章 air23
yarn logs -applicationId xxx 和 yarn 历史log 都查看不到FAILED 错误日志。 在 2020-11-20 17:53:03,"caozhen" 写道: > >1、jobmanager的日志有没有错误呢? >2、或者通过yarn history查下日志 yarn logs -applicationId xxx >3、如果是flink client 提交作业可以看下客户端日志 > > > >air23 wrote >> 你好 >> flink on yarn

Re:回复: Flink mysqlCDC ,然后jdbc sink 到mysql 乱序问题

2020-10-26 文章 air23
你好 主要流程 见附件 流程就是使用cdc 读取mysql。然后left join 维度表 ,最后写入到mysql 问题是测试的时候。update cdc的源表一条数据。发现结果的数据 有时候有 有时候没有, 使用connect=print 发现两条数据流。一个是delete 一个是insert,这边怀疑是乱序 导致先insert 在delete掉了, 把并行度设置为1的时候。就是正常的。如果沟通不方便 欢迎加钉钉13269166963。 在 2020-10-26 12:11:59,"史 正超" 写道: >Hi, @ai

Re:??????Flink mysqlCDC ,????jdbc sink ??mysql ????????

2020-10-25 文章 air23
?? eventtime?? ?? 2020-10-24 14:15:43??"pezynd" <284616...@qq.com> ?? >??Timestamp??Watermark > > > > >---- >??: >

Re:Re:Flink mysqlCDC ,然后jdbc sink 到mysql 乱序问题

2020-10-25 文章 air23
这边源数据 就是用cdc读取mysql。cdc不会保证有序吗 ? Forword 这个是通过什么方式保证的? 谢谢你的回复 在 2020-10-26 05:37:45,"hailongwang" <18868816...@163.com> 写道: >Hi air, >保证内部是 Forword 试试,因为内部是 Hash 或者 Rebalance 的话,就会出现相同的数据的操作记录被不同的并发处理,这样到 sink >时候就会出现乱序的可能。 > > >Best, >Hailong Wang. >

Flink mysqlCDC ,然后jdbc sink 到mysql 乱序问题

2020-10-23 文章 air23
你好, 这边发现使用cdc读取mysql ,然后写入mysql会有乱序问题 在上游mysql update一条数据,connert=print是有一条delete 和一条insert的数据, 但是jdbc写入mysql时候 发现mysql有时候是正常的,但是有时候会没有,当把并行度改成1的时候是正常的。 这边怀疑是乱序了,先insert 再delete了。所以导致结果表 没有这条数据,请问flink sql 或者flink cdc 怎么保证有序。

flink cdc读取mysql,执行sql业务逻辑, sink到mysql的bug

2020-10-20 文章 air23
大佬您好,mysql cdc 修改源表的数据,sink到目标表记录为空,但是再修改一次之后,目标表就有数据了。 例如: insert into student_info select a.id, a.name, b.name from a(id, name) left join b(id, class) 原始数据 a id name 1 xm b id class 1 1班 所以 student_info 有一条数据 1, xm, 1班 此时 a表的name修改为 -> xh student_info 的数据就变成 null, null, null

Flink Sql client on yarn 问题

2020-10-09 文章 air23
你好。我在sql client 配置的yarn yarn日志报错如下 2020-10-09 14:17:37,721 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 2020-10-09 14:17:37,726 INFO

Re:Re: flink canal-json 如何获取每条数据是updata insert delete

2020-09-27 文章 air23
>如果要抽取对应的 type,需要用 format = json, 把 json 的完整数据结构在 DDL 中声明出来(包括 type)。 > >目前 canal-json 是自动将 changelog 转成了 Flink 的 insert/update/delete,而这个 change >flag 目前是不对用户暴露的。 > >Best, >Jark > >On Fri, 25 Sep 2020 at 09:39, air23 wrote: > >> 你好 >> flink canal-json 如何获

flink canal-json 如何获取每条数据是updata insert delete

2020-09-24 文章 air23
你好 flink canal-json 如何获取每条数据是updata insert delete ,我ddl插件kafka表 用对应的type取 都是为null 这个操作类型 有办法取到吗?谢谢

一个main方法启动2个yarn job问题

2020-08-27 文章 air23
你好。我有一个接kafka 写入tidb的任务 为什么会启动2个yarn任务去运行呢? 我是先用datastream 接入kafka。然后转成table sql写入到tidb 2个job name 一个叫Env.execute配置的名字 一个是叫insert 写入tidb的sql语句名字

Re:Re: Flink SQL 问题;

2020-08-27 文章 air23
这个需要单独 导入到服务器lib下面吗 我的本地pom文件 是有引入的 在 2020-08-27 15:32:49,"Lin Hou" 写道: >jdbc connector jar没有导入线上集群,去官网上下载或者自己编译吧 > >air23 于2020年8月27日周四 下午3:11写道: > >> 你好 我用idea本地运行的jdbc sink没有问题。但是在服务器上运行报错如下 >> >> >> 麻烦帮忙看下什么问题 是1.1

Flink SQL 问题;

2020-08-27 文章 air23
你好 我用idea本地运行的jdbc sink没有问题。但是在服务器上运行报错如下 麻烦帮忙看下什么问题 是1.11 版本的 'connector'='jdbc' 'password'='' 'sink.buffer-flush.interval'='10s' 'sink.buffer-flush.max-rows'='500' 'table-name'='flink_test3' 'url'='jdbc:mysql://**:4000/test' 'username'='root' at

In the future,the community plans to extend its functionality by providing a REST-based SQL

2020-08-06 文章 air23
Limitations & Future The current SQL Client only supports embedded mode. In the future, the community plans to extend its functionality by providing a REST-based SQL Client Gateway, see more in FLIP-24 and FLIP-91. 你好 在官方文档上看到了。请问这个还在计划中。是一种rest sql web客户端吗?

Re:Re:写入hive 问题

2020-08-05 文章 air23
你好 谢谢。去掉版本号 确实可以了。我用的版本 和我安装的hive版本是一致的。不知道是什么原因导致的。 在 2020-08-05 15:59:06,"wldd" 写道: >hi: >1.你可以看下你配置hive catalog时的hive版本和你当前使用的hive版本是否一致 >2.你也可以尝试在配置hive catalog的时候,不设置hive版本 > > > > > > > > > > > > > >-- >

写入hive 问题

2020-08-05 文章 air23
你好 15:33:59,781 INFO org.apache.flink.table.catalog.hive.HiveCatalog - Created HiveCatalog 'myhive1' Exception in thread "main" org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client at

stream sink hive 在hdfs ha模式下

2020-08-02 文章 air23
hi 你好 我这边集群是cdh的。 配置了hdfs ha模式 在使用 kafka sink 到hive 时候找不到nameservices java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservices1 请问 在ha模式下 应该怎么配置

Re:Re: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。 >1.12 中已经支持读取复杂结构为 string 类型了。 > >Best, >Jark > >On Tue, 28 Jul 2020 at 15:36, air23 wrote: > >> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 >> >> >> { >> "data":[

Re:回复: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
你好 使用的是1.11.1版本的 在 2020-07-28 16:02:30,"明启 孙" <374060...@qq.com> 写道: >你的flink什么版本 > >发送自 Windows 10 版邮件应用 > >发件人: air23 >发送时间: 2020年7月28日 15:36 >收件人: user-zh@flink.apache.org >主题: Re:Re: Re: 解析kafka的mysql binlog问题 > >格式如下 是canal解析的b

Re:Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
t; > >> 2020年7月28日 下午3:35,air23 写道: >> >> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 >> >> >> { >>"data":[ >>{ >>"op_id":"97037138", >>"order_i

Re:Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
:"", "sqlType":{ "op_id":4, "order_id":4 }, "table":"order_product", "ts":1595720375837, "type":"INSERT" } 在 2020-07-28 14:44:35,"Jark Wu" 写道: >有

Re:Re: 解析kafka的mysql binlog问题

2020-07-27 文章 air23
nventory_batch_log ,order_log ,order_address_book ,product_inventory ,order_physical_relation ,bil_business_attach ,picking_detail ,picking_detail ,orders 另外再问个问题。1.11版本 blink 不能datastream转table吗? 看到例子都是useOldPlanner 来转table的。 致谢 在 2020-07-27 19:44:10,"Jark Wu" 写道: >抱歉,还是没有看到附件

回复:解析kafka的mysql binlog问题

2020-07-27 文章 air23
我再上传一次 在2020年07月27日 18:55,Jark Wu 写道: Hi, 你的附件好像没有上传。 On Mon, 27 Jul 2020 at 18:17, air23 wrote: > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* > > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >

解析kafka的mysql binlog问题

2020-07-27 文章 air23
你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢? private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + " `data` VARCHAR , " + " `table` VARCHAR " + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'order_source'," + " 'properties.bootstrap.servers' =

Flink Sql 问题

2020-07-27 文章 air23
你好

做实时数仓,sql怎么保证分topic区有序

2020-07-02 文章 air23
hi 就是我用 flink sql 通过ddl读取和写入kafka怎么设置并行度呢? flink sql 通过ddl写入kafka怎么自定义分区呢? 这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置 或者做自定义分区。

Re:回复:flink on yarn报错 怎么获取

2020-06-02 文章 air23
分钟级别定时去获取metrics? 这样吗 在 2020-06-02 14:05:39,"阿华田" 写道: >这种情况需要对flink任务进行监控 获取flink的任务状态 > > >| | >阿华田 >| >| >a15733178...@163.com >| >签名由网易邮箱大师定制 > > >在2020年06月2日 14:03,air23 写道: >今天发现taskmanagers报json解析失败 他一起在重启 >但是我们这边是监控y

flink on yarn报错 怎么获取

2020-06-02 文章 air23
今天发现taskmanagers报json解析失败 他一起在重启 但是我们这边是监控yarn 任务级别的。像这种task 里面报错。yarn任务又不会挂掉。应该怎么去做监控。才能得知 程序后台有问题 谢谢

Re:Re: flink1.10 on yarn 问题

2020-05-29 文章 air23
unt By Java"); } 我现在加了flink环境变量 这个例子 可以过了。就很奇怪 在 2020-05-29 14:22:39,"tison" 写道: >然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x) > >Best, >tison. > > >tison 于2020年5月29日周五 下午2:21写道: > >> 这个问题好诡异啊,一般来说编译会在 env.execute >> 的时候拦

Re:Re: flink1.10 on yarn 问题

2020-05-29 文章 air23
in hdfs://ZONGTENGSERIVCE/user/root/.flink/application_1590715263014_0033. 在 2020-05-29 14:21:39,"tison" 写道: >这个问题好诡异啊,一般来说编译会在 env.execute >的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)? > >Best, >tison. > > >air23 于2020年5月29日周五 下午

flink1.10 on yarn 问题

2020-05-28 文章 air23
cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题 flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了 hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf 求解答 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error:

Re:Re: 回复:flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端

2020-05-28 文章 air23
可以看下你的HADOOP_CONF吗。我的配置的=/etc/hadoop/conf。 开源的Hadoop版本 这个也放了 在 2020-05-28 09:36:10,"wangweigu...@stevegame.cn" 写道: > >确实,你只要配置好CDH的HADOOP_CONF环境变量,同时下载开源的Hadoop版本(和CDH版本相同)放到flink > lib下,就可以访问CDH yarn,提交作业! > >目前我这边是CDH 5.16.1,Flink 1.10,提交Flink on

flink写入hbase 报错如下 还会导致反压 任务变慢

2020-05-28 文章 air23
2020-05-28 16:54:23,867 INFO org.apache.hadoop.hbase.client.AsyncRequestFutureImpl - id=2, table=GC_SCHEM:mon1, attempt=7/16, failureCount=427ops, last exception=org.apache.hadoop.hbase.RegionTooBusyException: org.apache.hadoop.hbase.RegionTooBusyException: Above memstore limit,