with (
'connector'='kafka',
'topic'='cloud_behavior;cloud_behavior_other;cloud_behavior_qxb;cloud_behavior_cc;cloud_behavior_cs',
'properties.bootstrap.servers'='',
'properties.group.id'='flink_2_hive_and_imei_ncrypy_test',
'format'='avro',
'scan.startup.mode'='group-offsets'
现在就可以使用,刚查了一下,你可以参考下ShengKai的
The config option `topic` and `topic-pattern` specifies the topics or topic
pattern to consume for source. The config option `topic` can accept topic list
using semicolon separator like 'topic-1;topic-2'.
好的 那我尝试下通过KafkaDynamicTableFactory来实现
--
Sent from: http://apache-flink.147419.n8.nabble.com/
目前应该没有直接可以使用的方式,但是可以通过重写KafkaDynamicTableFactory来实现你要的结果,不知道社区在之后有没有考虑加上topic-separator
--
发件人:奔跑的小飞袁
发送时间:2020年10月26日(星期一) 14:23
收件人:user-zh
主 题:Re: 回复:flinksql指定kafka多topic
有没有一种更加友好的方式 使用topic-pattern的话在提供给非开发人员使用成本太高
--
Se
有没有一种更加友好的方式 使用topic-pattern的话在提供给非开发人员使用成本太高
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,可以试试用topic-pattern
--
发件人:奔跑的小飞袁
发送时间:2020年10月26日(星期一) 14:08
收件人:user-zh
主 题:flinksql指定kafka多topic
hello,
我想问一下目前flinksql支持同时指定kafka的多topic吗,例如
'topic'='cloud_behavior,cloud_behavior_other,cloud_behavior_qxb,cloud_behavior_cc,c
hello,
我想问一下目前flinksql支持同时指定kafka的多topic吗,例如
'topic'='cloud_behavior,cloud_behavior_other,cloud_behavior_qxb,cloud_behavior_cc,cloud_behavior_cs'
--
Sent from: http://apache-flink.147419.n8.nabble.com/
??
batchsize??
-- --
??:
Hi, @air23, 你能提供下完整的sql吗?,我来复现下这个问题
发件人: air23
发送时间: 2020年10月23日 6:21
收件人: user-zh@flink.apache.org
主题: Flink mysqlCDC ,然后jdbc sink 到mysql 乱序问题
你好,
这边发现使用cdc读取mysql ,然后写入mysql会有乱序问题
在上游mysql update一条数据,connert=print是有一条delete 和一条insert的数据,
但是jdbc写入mysql时候 发现mysql
?? eventtime??
?? 2020-10-24 14:15:43??"pezynd" <284616...@qq.com> ??
>??Timestamp??Watermark
>
>
>
>
>-- --
>??:
>
?? cdc joinmysql
?? 2020-10-24 14:15:43??"pezynd" <284616...@qq.com> ??
>??Timestamp??Watermark
>
>
>
>
>-- --
>??:
>
就是一个update 操作。就是乱序了 。
在 2020-10-23 19:12:20,"熊云昆" 写道:
>你的数据有没有时间属性?可以用时间来判断吧
>
>
>| |
>熊云昆
>|
>|
>邮箱:xiongyun...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2020年10月23日 14:21,air23 写道:
>你好,
>这边发现使用cdc读取mysql ,然后写入mysql会有乱序问题
>在上游mysql update一条数据,connert=print是有一条delete 和一条insert的数据,
>但是jdbc写入mysql时候 发现mysql有时
这边源数据 就是用cdc读取mysql。cdc不会保证有序吗 ? Forword 这个是通过什么方式保证的? 谢谢你的回复
在 2020-10-26 05:37:45,"hailongwang" <18868816...@163.com> 写道:
>Hi air,
>保证内部是 Forword 试试,因为内部是 Hash 或者 Rebalance 的话,就会出现相同的数据的操作记录被不同的并发处理,这样到 sink
>时候就会出现乱序的可能。
>
>
>Best,
>Hailong Wang.
>
>
>
>
>在 2020-10-23 13:21:19,"air23" 写道:
>
知道问题所在了,那个在配置文件中设置state.checkpoints.num-retained是生效的
在webui,任务checkpoint的history中总是显示10条最新的记录(我以为就是一定是保留了最新的10份数据),
但是其实真正持久化有数据的数目是根据state.checkpoints.num-retained的值
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,
你似乎没有把你的3个图给成功贴上来。如果是之前你抛出来的那个代码和错误的话,就像hailong解释的,是你的读入的csv数据源的数据的第一列数据有的不是Long导致的,你需要检查一下你的数据内容。还有你那个arrow的batchsize设置成2也太小了点,默认情况下是1,其实大部分情况你是不需要去设置这个值的。
,
Best,
Xingbo
pyflink??
1.png??2.jpg??3.png??
关于flink1.11 新版本使用dataStreamEnv.execute 和tableEnv.execute出现No operators defined
in streaming topology. Cannot generate StreamGraph.的问题
-| 程序内部使用Table API同时也有 Table转为 Datastream的场景。
-|程序内部有使用flinksql 读写kafka,从而执行 sqlUpdate
尝试使用新版api 只使用tableEnv.executeSql 从而不加 dataStreamEnv.execute
关于flink1.11 新版本使用dataStreamEnv.execute 和tableEnv.execute出现No operators defined
in streaming topology. Cannot generate StreamGraph.的问题
-| 程序内部使用Table API同时也有 Table转为 Datastream的场景。
-|程序内部有使用flinksql 读写kafka,从而执行 sqlUpdate
尝试使用新版api 只使用tableEnv.executeSql 从而不加 dataStreamEnv.execute 和tableE
谢谢~ 我自已眼拙啦
在2020年10月25日 21:19,Natasha<13631230...@163.com> 写道:
hi 社区,
我想请问下如果我用第二种红色方框内的写法跟第一种红色方框有什么不一样吗?两种方式执行的程序结果是不一样的,但是不明白为什么不同
第一个是TumblingWindow,第二个是SlidingWindow
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/operators/windows.html#tumbling-windows
Best Regards
On Sun, Oct 25, 2020 at 9:20 PM Natasha <13631230...@163.com> wrote:
> hi 社区,
> 我想请问下如果我用第二种红色方框内的写法跟第一种红色方框有什么不一样吗?两种方式执行的
Hi air,
保证内部是 Forword 试试,因为内部是 Hash 或者 Rebalance 的话,就会出现相同的数据的操作记录被不同的并发处理,这样到 sink
时候就会出现乱序的可能。
Best,
Hailong Wang.
在 2020-10-23 13:21:19,"air23" 写道:
>你好,
>这边发现使用cdc读取mysql ,然后写入mysql会有乱序问题
>在上游mysql update一条数据,connert=print是有一条delete 和一条insert的数据,
>但是jdbc写入mysql时候 发现mysql有时候是正常的,但是有时候会没有
Hi,
根据你提供的堆栈,真正报错的堆栈如下:
`Parsing error for column 1 of row '锘�1,98' originated by LongParser:
NUMERIC_VALUE_ILLEGAL_CHARACTER.`
这是因为不能将数据转换为 long 类型,故你可以对应字段定义为 varchar。
Best,
Hailong Wang.
在 2020-10-25 17:53:29,"洗你的头" <1264386...@qq.com> 写道:
>尊敬的开发者您好:我在新使用pyflink时,跑通了简单的单词统计的例子,但是在运行求和的例子时报错了,
hi all:
flink 启动报错,调整了配置classloader.resolve-order: parent-first 也没用效果,请问该问题如何排查?
2020-10-25 22:14:00
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
instantiate user function.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java
hi 社区,
我想请问下如果我用第二种红色方框内的写法跟第一种红色方框有什么不一样吗?两种方式执行的程序结果是不一样的,但是不明白为什么不同
pyflink??
1.??
from pyflink.table import StreamTableEnvironment, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.t
Hi
看起来这个query应该是没问题的,语法也是flink支持的,能贴点结果数据吗?可以简化下query能复现就行。
祝好
Leonard
Hi
> 在 2020年10月21日,16:27,marble.zh...@coinflex.com.invalid
> 写道:
>
> select marketCode as market_code,
> first_value(matchedPrice) over (partition by marketCode ORDER BY
> transTime) as vopen
> from TickData
> where action = 'OrderMatched' and side = 'BUY'
> group by marketCode, HOP
27 matches
Mail list logo