?????? ????kafka????????????

2020-09-04 文章 ????????
??100,0100??100 0,100??(??100)??? --  -- ??:

回复:关于flink sql并行度问题的请教

2020-09-04 文章 faaron zheng
Hi, HiveTableSource默认会根据数据大小自行分配并发,所以和你设置的最大并发冲突了,你可以设置table. exec. hive. infer-source-parallelism: false来关闭这个功能 Best, Faaron Zheng 在2020年09月04日 15:29,me 写道: val tableConfig = tableEnv.getConfig.getConfiguration tableConfig.setString("table.exec.resource.default-parallelism","4") 已经加了table的并行度设

回复:flink sql client 如何同时执行多条 sql 语句

2020-09-04 文章 faaron zheng
Hi, sql-client目前应该是没有这个能力的,它是交互式执行的,我们之前在sql-client的基础上改过一个类似beeline -e/-f的脚本,主要修改的提交任务的地方。 Best, Faaron Zheng 在2020年09月04日 17:04,LittleFall 写道: 我有一个 sql 文件,它里面有不少 flink sql 的创建表的语句和查询语句,现在我想要通过 sql client 提交这些任务,却只能一句一句的复制粘贴。如果同时复制粘贴多条语句就会报错,在 flink sql client 中使用 source xxx.sql 也会报错。 请问用什么样的方

回复:消费kafka数据乱序问题

2020-09-04 文章 smq
换句话说,写进kafka的数据是同一个用户的两条,余额分别是0和100,就是我之前那个计算过程,这个计算操作在Oracle完成,我只负责把余额,也就是0或者100更新到kudu对应的余额字段,因为消费数据可能会乱序,先更新100,再更新0,这样导致存在kudu的数据是0,正确的数据最终应该是100 ---原始邮件--- 发件人: "wwj"

flink ???? StreamingFileSink ??catalog??????????????

2020-09-04 文章 MuChen
hi, all?? DataStream APIkafka??DataStream ds1?? tableEnvhive catalog?? tableEnv.registerCatalog(catalogName, catalog); tableEnv.useCatalog(catalogName); ??ds1??table Table sourcetable = tableEnv.fromDataStrea

1.11????????????????????????????????????????????????????????????????

2020-09-04 文章 Asahi Lee
?? ??StreamTableEnvironment.from("") ??package kafka; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org

Re: 消费kafka数据乱序问题

2020-09-04 文章 Xiao Xu
两个方法 1. kafka 里面可以 keyby, partition 里面都是有序的, 所以每个用户处理都是有序的 2. 就是你说的在 flink 里面做乱序处理 宁吉浩 于2020年9月4日周五 下午5:56写道: > 我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格; > 我的解决办法是把迟到数据丢弃,然后进行业务计算; > 另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算; > 之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次; > > > -

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-04 文章 Peihui He
Hi, all 当指定partition的时候这个问题通过path 也没法解决了 CREATE TABLE MyUserTable ( column_name1 INT, column_name2 STRING, dt string,) PARTITIONED BY (dt) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///path/to/whatever', -- required: path to a director

sql-client checkpoint sql-client

2020-09-04 文章 引领
想尝试采用flink-cdc 来清洗数据,但是尝试下来,困于几点: ① 使用sql-client 开启checkpoint ,如果程序挂掉,该如何接着checkpoint,继续执行相应程序。尤其是在执行group by或者是count等操作时该如何办? ② 如果以上方式不行,是否可以采用写代码的形式,重启时指定checkpoint,但还是采用flink-cdc的方式去消费 | | 引领 | | yrx73...@163.com | 签名由网易邮箱大师定制

flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-04 文章 Peihui He
hi, all 我这边用flink sql client 创建表的时候 CREATE TABLE MyUserTable ( column_name1 INT, column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///path/to/whatever', -- required: path to a direct

回复:消费kafka数据乱序问题

2020-09-04 文章 宁吉浩
我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格; 我的解决办法是把迟到数据丢弃,然后进行业务计算; 另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算; 之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次; -- 发件人:smq <374060...@qq.com> 发送时间:2020年9月4日(星期五) 17:35 收件人:wwj ; user-zh 主 题:回复:消费kafka数据乱序问

??????????????flink??????????????????

2020-09-04 文章 ????
?? Flink+drools drools 2020-9-4 | | | | hold_li...@163.com | ?? ??2020??8??6?? 10:26??samuel@ubtrobot.com ?? flink ,??

Re: 请指教一个关于时间窗的问题,非常感谢!

2020-09-04 文章 zilong xiao
可否发下是哪个配置,有相关的文档吗? superainbower 于2020年9月4日周五 下午5:24写道: > 1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题 > > > | | > superainbower > | > | > superainbo...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年09月4日 15:11,taochanglian 写道: > 确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数

回复: 请指教一个关于时间窗的问题,非常感谢!

2020-09-04 文章 superainbower
1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题 | | superainbower | | superainbo...@163.com | 签名由网易邮箱大师定制 在2020年09月4日 15:11,taochanglian 写道: 确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key hash,数据只进入1个partition,就不会触发计算。

flink sql client 如何同时执行多条 sql 语句

2020-09-04 文章 LittleFall
我有一个 sql 文件,它里面有不少 flink sql 的创建表的语句和查询语句,现在我想要通过 sql client 提交这些任务,却只能一句一句的复制粘贴。如果同时复制粘贴多条语句就会报错,在 flink sql client 中使用 source xxx.sql 也会报错。 请问用什么样的方法可以一次性执行多条语句呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/

消费kafka数据乱序问题

2020-09-04 文章 smq
大家好    现在碰到一个关于处理乱序的问题,业务场景是银行余额的更新,数据源是kafka,有一个账户余额字段,sink到kudu,更新客户余额. 如果对于同一个账户的多笔操作出现乱序,可能会导致客户余额不对。比如一个客户账户有100块,先消费100,然后存100,按照正常的数据处理顺序,帐户余额应该还是100的,假如说先处理了存钱,存上之后余额是100-100+100=100,然后处理消费100的那条数据,由于消费在前,这时这条数据对应的余额是100-100=0。   这样的话就出现了统计余额错误,请问想要按照事件时间处理数据是不是可以先keyby,然后用watermark.

Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 文章 Xingbo Huang
Hi, 推荐你使用ddl来声明你上下游用的connector ``` table_env.execute_sql(""" CREATE TABLE output ( data STRING ARRAY ) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///tmp/test.csv', -- required: path to a directory 'format' = 'json', 'json.fail-on-missing-fi

Re:Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 文章 whh_960101
您好,我是想让输出insert_into到目标表中,具体如下: st_env=StreamExecutionEnvironment.get_execution_environment() st_env.connect了一个source table(table包含a字段), 然后 | st_env.connect(FileSystem().path('tmp')) \ | | | .with_format(OldCsv() | | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ | | | .with_schema(Schema(

Re:Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 文章 whh_960101
您好,我是想让输出insert_into到目标表中,具体如下: st_env=StreamExecutionEnvironment.get_execution_environment() st_env.connect了一个source table(table包含a字段), 然后 | st_env.connect(FileSystem().path('tmp')) \ | | | .with_format(OldCsv() | | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ | | | .with_schema(Schema(

回复:关于flink sql并行度问题的请教

2020-09-04 文章 me
val tableConfig = tableEnv.getConfig.getConfiguration tableConfig.setString("table.exec.resource.default-parallelism","4") 已经加了table的并行度设置,但是提示小于104并行度不让执行 Vertex Source: HiveTableSource()'s parallelism (104) is higher than the max parallelism (4). Please lower the parallelism or increase the

关于flink sql并行度问题的请教

2020-09-04 文章 me
我的代码中使用flinksql,本机idea中测试没有问题,放到集群上跑之后,占用了全部40个slots,并且并行度为114,程序也一直卡着没有输出,是一个加入了时间窗口的flink sql 我再代码中全局设置了,dataStreamEnv.setParallelism(4) dataStreamEnv.setMaxParallelism(4) 但是感觉完全不起作用,请问怎么去限制flink sql的并行度?

Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 文章 Xingbo Huang
Hi, 你是调试的时候想看结果吗? 你可以直接table.to_pandas()来看结果,或者用print connector来看。 个人觉得to_pandas最简单,比如你可以试试下面的例子 ``` table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b']) @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ARRAY(DataTypes.STRING())) def func(a): return np.array([a, a,

Re: 回复:请指教一个关于时间窗的问题,非常感谢!

2020-09-04 文章 taochanglian
确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。 在 2020/9/4 13:14, Benchao Li 写道: 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。 要处理这种情况,可以了解下idle