这边是想离线读取。不是走实时的
看到异常是 Only insert statement is supported now
在 2021-03-30 10:31:51,"guoyb" <861277...@qq.com> 写道:
>可以读取的,还有内置flink cdc
>select得用query方法,看看是不是用错了execute。
>
>
>
>---原始邮件---
>发件人: "air23"发送时间: 2021年3月30日(周二) 上午10:25
>
你好 参考官网
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html
这边读取mysql jdbc数据报错Exception in thread "main"
org.apache.flink.table.api.TableException: Only insert statement is supported
now.
String a = "-- register a MySQL table 'users' in Flink SQL\n"
你好 参考官网
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html
这边读取mysql jdbc数据报错Exception in thread "main"
org.apache.flink.table.api.TableException: Only insert statement is supported
now.
String a = "-- register a MySQL table 'users' in Flink SQL\n"
是可以这样恢复。
但是如果使用setStartFromGroupOffsets() 如果中间程序挂了,
会导致消费者已经提交了offset,但是下游数据没有处理完 ,或者没有sink到下游
下次重启会从kakfa的偏移量开始消费
没有处理的数据,会丢失
在 2021-01-18 10:50:44,"hoose" <307840...@qq.com> 写道:
>我理解的是这样,虽然不是从savepoint里恢复,但kafka
>consumer_topic有了我们之前保存的groupid,那么默认的是flink是这样设置:
>
我的意思 是不使用checkpoint。
使用'scan.startup.mode' = 'group-offsets' 去维护offset
在 2021-01-15 11:35:16,"Michael Ran" 写道:
>下游sink还没有完成, offset 不是在checkpoint 里面的吗?下次启动会从你ck的位置恢复才对。除非你sink
>是异步操作,告诉上游你sink 完成了,实际你sink失败了
>在 2021-01-15 10:29:15,"air23" 写道:
&g
flink消费kafka 只能使用checkpoint去维护offset吗
我这边使用'scan.startup.mode' = 'group-offsets'
如果中间报错了 或者停止任务,但是我下游sink还没有完成,
下次启动直接跳过这个报错的数据,会丢数据,谢谢回复
下载个 flink-sql-connector-kafka 这个jar 放在lib下试下
在 2021-01-09 02:08:12,"inza9hi" 写道:
>搜了下之前的邮件,貌似没有发现和我同样的问题。
>
>lib 下的Jar
>flink-csv-1.11.3.jar
>flink-table-blink_2.11-1.11.3.jar
>flink-dist_2.11-1.11.3.jar
发现是flink sql 消费kafka 不管有没有解析成功。先去提交offset到kafka 但是实际 是解析失败了。
在 2021-01-06 14:01:34,"Evan" 写道:
>flinksql 貌似是目前做不到你说的这样
>
>
>
>
>发件人: air23
>发送时间: 2021-01-06 12:29
>收件人: user-zh
>主题: flink sql消费kafka sink到mysql问题
>你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
然后再重启 发现报错的数据 会丢失
采用的scan.startup.mode' = 'group-offsets'
按理说 不是要重新消费 失败的那条数据 开始消费吗?
请问如何配置 可以不丢失数据
CREATE TABLE source1 (
id BIGINT ,
username STRING ,
password STRING ,
AddTime TIMESTAMP ,
origin_table STRING METADATA FROM
你好 这边使用flink sql有如下问题;
CREATE TABLE source1 (
id BIGINT ,
username STRING ,
password STRING ,
AddTime TIMESTAMP ,
origin_table STRING METADATA FROM 'value.table' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'plink_canal',
'properties.bootstrap.servers' = '',
在官方文档上 看到 canal-json format 有如下两个key
canal-json.database.include
canal-json.table.include
这2个key 看源码是equals等于关系,而不是include包含关系 ,是否这2个字段可以配置正则表达式来包含配置的表 来支持呢,如
"'canal-json.table.include' = 'test*',"+
因为可能会用到分库分表的flink场景。
谢谢
是的 感觉国内canal用户应该比debezium用户多很多,谢谢支持
在 2020-11-27 11:40:55,"Leonard Xu" 写道:
>Hi, air23
>
>国内用户用Canal-json还是很多的,我建了个issue 来支持, 你可以关注下。
>
>> 你好 请问这个debezium-json 这个value.source.table 功能 在1.12的canal-json会实现吗,
>> 看到canal-json代码里面 好像是有这部
bezium/DebeziumJsonDecodingFormat.java#L136
>
>On Fri, 27 Nov 2020 at 09:49, air23 wrote:
>
>> 你好 我这边有很多这种场景,把分库分表的binlog 写入到同一个topic 然后用canal-json解析,这边想获取到json里面的table
>> 字段,
>>
>> 然后根据 table名称加主键 写入到下游 合成一张表,写入到下游表,
>>
>> 然后发现 canal-json 是获取不到表名的,然后这边去修改c
你好 我这边有很多这种场景,把分库分表的binlog 写入到同一个topic 然后用canal-json解析,这边想获取到json里面的table 字段,
然后根据 table名称加主键 写入到下游 合成一张表,写入到下游表,
然后发现 canal-json 是获取不到表名的,然后这边去修改canal-json的format。
添加 createJsonRowType方法的DataTypes.FIELD("table", DataTypes.STRING())
然后在deserialize方法里面把 table字段set到 data里面去。但是发现这种好像是不成功的 ,请问下
但是 在yarn上跑的spark 任务 都是可以看到错误日志的, flink这边配置的是log4j的日志文件,本地运行 控制台是可以看到错误原因 和日志
在 2020-11-20 17:53:03,"caozhen" 写道:
>
>1、jobmanager的日志有没有错误呢?
>2、或者通过yarn history查下日志 yarn logs -applicationId xxx
>3、如果是flink client 提交作业可以看下客户端日志
>
>
>
>air23 wrote
>> 你好
&
yarn logs -applicationId xxx 和 yarn 历史log 都查看不到FAILED 错误日志。
在 2020-11-20 17:53:03,"caozhen" 写道:
>
>1、jobmanager的日志有没有错误呢?
>2、或者通过yarn history查下日志 yarn logs -applicationId xxx
>3、如果是flink client 提交作业可以看下客户端日志
>
>
>
>air23 wrote
>> 你好
>> flink on yarn
你好 主要流程 见附件
流程就是使用cdc 读取mysql。然后left join 维度表 ,最后写入到mysql
问题是测试的时候。update cdc的源表一条数据。发现结果的数据 有时候有 有时候没有,
使用connect=print 发现两条数据流。一个是delete 一个是insert,这边怀疑是乱序 导致先insert 在delete掉了,
把并行度设置为1的时候。就是正常的。如果沟通不方便 欢迎加钉钉13269166963。
在 2020-10-26 12:11:59,"史 正超" 写道:
>Hi, @ai
?? eventtime??
?? 2020-10-24 14:15:43??"pezynd" <284616...@qq.com> ??
>??Timestamp??Watermark
>
>
>
>
>----
>??:
>
这边源数据 就是用cdc读取mysql。cdc不会保证有序吗 ? Forword 这个是通过什么方式保证的? 谢谢你的回复
在 2020-10-26 05:37:45,"hailongwang" <18868816...@163.com> 写道:
>Hi air,
>保证内部是 Forword 试试,因为内部是 Hash 或者 Rebalance 的话,就会出现相同的数据的操作记录被不同的并发处理,这样到 sink
>时候就会出现乱序的可能。
>
>
>Best,
>Hailong Wang.
>
你好,
这边发现使用cdc读取mysql ,然后写入mysql会有乱序问题
在上游mysql update一条数据,connert=print是有一条delete 和一条insert的数据,
但是jdbc写入mysql时候 发现mysql有时候是正常的,但是有时候会没有,当把并行度改成1的时候是正常的。
这边怀疑是乱序了,先insert 再delete了。所以导致结果表 没有这条数据,请问flink sql 或者flink cdc 怎么保证有序。
大佬您好,mysql cdc 修改源表的数据,sink到目标表记录为空,但是再修改一次之后,目标表就有数据了。
例如:
insert into student_info
select a.id, a.name, b.name from a(id, name) left join b(id, class)
原始数据
a
id name
1 xm
b
id class
1 1班
所以
student_info 有一条数据
1, xm, 1班
此时
a表的name修改为 -> xh
student_info 的数据就变成
null, null, null
你好。我在sql client 配置的yarn yarn日志报错如下
2020-10-09 14:17:37,721 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
2020-10-09 14:17:37,726 INFO
>如果要抽取对应的 type,需要用 format = json, 把 json 的完整数据结构在 DDL 中声明出来(包括 type)。
>
>目前 canal-json 是自动将 changelog 转成了 Flink 的 insert/update/delete,而这个 change
>flag 目前是不对用户暴露的。
>
>Best,
>Jark
>
>On Fri, 25 Sep 2020 at 09:39, air23 wrote:
>
>> 你好
>> flink canal-json 如何获
你好
flink canal-json 如何获取每条数据是updata insert delete ,我ddl插件kafka表 用对应的type取 都是为null
这个操作类型 有办法取到吗?谢谢
你好。我有一个接kafka 写入tidb的任务 为什么会启动2个yarn任务去运行呢?
我是先用datastream 接入kafka。然后转成table sql写入到tidb
2个job name 一个叫Env.execute配置的名字
一个是叫insert 写入tidb的sql语句名字
这个需要单独 导入到服务器lib下面吗
我的本地pom文件 是有引入的
在 2020-08-27 15:32:49,"Lin Hou" 写道:
>jdbc connector jar没有导入线上集群,去官网上下载或者自己编译吧
>
>air23 于2020年8月27日周四 下午3:11写道:
>
>> 你好 我用idea本地运行的jdbc sink没有问题。但是在服务器上运行报错如下
>>
>>
>> 麻烦帮忙看下什么问题 是1.1
你好 我用idea本地运行的jdbc sink没有问题。但是在服务器上运行报错如下
麻烦帮忙看下什么问题 是1.11 版本的
'connector'='jdbc'
'password'=''
'sink.buffer-flush.interval'='10s'
'sink.buffer-flush.max-rows'='500'
'table-name'='flink_test3'
'url'='jdbc:mysql://**:4000/test'
'username'='root'
at
Limitations & Future
The current SQL Client only supports embedded mode. In the future, the
community plans to extend its functionality by providing a REST-based SQL
Client Gateway, see more in FLIP-24 and FLIP-91.
你好 在官方文档上看到了。请问这个还在计划中。是一种rest sql web客户端吗?
你好 谢谢。去掉版本号 确实可以了。我用的版本 和我安装的hive版本是一致的。不知道是什么原因导致的。
在 2020-08-05 15:59:06,"wldd" 写道:
>hi:
>1.你可以看下你配置hive catalog时的hive版本和你当前使用的hive版本是否一致
>2.你也可以尝试在配置hive catalog的时候,不设置hive版本
>
>
>
>
>
>
>
>
>
>
>
>
>
>--
>
你好
15:33:59,781 INFO org.apache.flink.table.catalog.hive.HiveCatalog
- Created HiveCatalog 'myhive1'
Exception in thread "main"
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create
Hive Metastore client
at
hi 你好
我这边集群是cdh的。 配置了hdfs ha模式
在使用 kafka sink 到hive 时候找不到nameservices
java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservices1
请问 在ha模式下 应该怎么配置
是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
>1.12 中已经支持读取复杂结构为 string 类型了。
>
>Best,
>Jark
>
>On Tue, 28 Jul 2020 at 15:36, air23 wrote:
>
>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>
>>
>> {
>> "data":[
你好 使用的是1.11.1版本的
在 2020-07-28 16:02:30,"明启 孙" <374060...@qq.com> 写道:
>你的flink什么版本
>
>发送自 Windows 10 版邮件应用
>
>发件人: air23
>发送时间: 2020年7月28日 15:36
>收件人: user-zh@flink.apache.org
>主题: Re:Re: Re: 解析kafka的mysql binlog问题
>
>格式如下 是canal解析的b
t;
>
>> 2020年7月28日 下午3:35,air23 写道:
>>
>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>
>>
>> {
>>"data":[
>>{
>>"op_id":"97037138",
>>"order_i
:"",
"sqlType":{
"op_id":4,
"order_id":4
},
"table":"order_product",
"ts":1595720375837,
"type":"INSERT"
}
在 2020-07-28 14:44:35,"Jark Wu" 写道:
>有
nventory_batch_log
,order_log
,order_address_book
,product_inventory
,order_physical_relation
,bil_business_attach
,picking_detail
,picking_detail
,orders
另外再问个问题。1.11版本 blink 不能datastream转table吗?
看到例子都是useOldPlanner 来转table的。
致谢
在 2020-07-27 19:44:10,"Jark Wu" 写道:
>抱歉,还是没有看到附件
我再上传一次
在2020年07月27日 18:55,Jark Wu 写道:
Hi,
你的附件好像没有上传。
On Mon, 27 Jul 2020 at 18:17, air23 wrote:
> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>
你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?
private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order_source'," +
" 'properties.bootstrap.servers' =
你好
hi
就是我用
flink sql 通过ddl读取和写入kafka怎么设置并行度呢?
flink sql 通过ddl写入kafka怎么自定义分区呢?
这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置 或者做自定义分区。
分钟级别定时去获取metrics?
这样吗
在 2020-06-02 14:05:39,"阿华田" 写道:
>这种情况需要对flink任务进行监控 获取flink的任务状态
>
>
>| |
>阿华田
>|
>|
>a15733178...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2020年06月2日 14:03,air23 写道:
>今天发现taskmanagers报json解析失败 他一起在重启
>但是我们这边是监控y
今天发现taskmanagers报json解析失败 他一起在重启
但是我们这边是监控yarn 任务级别的。像这种task 里面报错。yarn任务又不会挂掉。应该怎么去做监控。才能得知 程序后台有问题
谢谢
unt By Java");
}
我现在加了flink环境变量 这个例子 可以过了。就很奇怪
在 2020-05-29 14:22:39,"tison" 写道:
>然后你 execute 前后的代码片段甚至整个 main 如果可以的话通过 gist 贴一下(x)
>
>Best,
>tison.
>
>
>tison 于2020年5月29日周五 下午2:21写道:
>
>> 这个问题好诡异啊,一般来说编译会在 env.execute
>> 的时候拦
in
hdfs://ZONGTENGSERIVCE/user/root/.flink/application_1590715263014_0033.
在 2020-05-29 14:21:39,"tison" 写道:
>这个问题好诡异啊,一般来说编译会在 env.execute
>的时候拦截,不应该真的调度起来才对。你能详细描述一下你提交作业的方法还有这个错误报在哪里吗(client?cluster?)?
>
>Best,
>tison.
>
>
>air23 于2020年5月29日周五 下午
cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题
flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了
hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf
求解答
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error:
可以看下你的HADOOP_CONF吗。我的配置的=/etc/hadoop/conf。
开源的Hadoop版本 这个也放了
在 2020-05-28 09:36:10,"wangweigu...@stevegame.cn"
写道:
>
>确实,你只要配置好CDH的HADOOP_CONF环境变量,同时下载开源的Hadoop版本(和CDH版本相同)放到flink
> lib下,就可以访问CDH yarn,提交作业!
>
>目前我这边是CDH 5.16.1,Flink 1.10,提交Flink on
2020-05-28 16:54:23,867 INFO
org.apache.hadoop.hbase.client.AsyncRequestFutureImpl - id=2,
table=GC_SCHEM:mon1, attempt=7/16, failureCount=427ops, last
exception=org.apache.hadoop.hbase.RegionTooBusyException:
org.apache.hadoop.hbase.RegionTooBusyException: Above memstore limit,
47 matches
Mail list logo