??100,0100??100
0,100??(??100)???
-- --
??:
Hi, HiveTableSource默认会根据数据大小自行分配并发,所以和你设置的最大并发冲突了,你可以设置table. exec. hive.
infer-source-parallelism: false来关闭这个功能 Best, Faaron Zheng 在2020年09月04日 15:29,me
写道: val tableConfig = tableEnv.getConfig.getConfiguration
tableConfig.setString("table.exec.resource.default-parallelism","4")
已经加了table的并行度设
Hi, sql-client目前应该是没有这个能力的,它是交互式执行的,我们之前在sql-client的基础上改过一个类似beeline
-e/-f的脚本,主要修改的提交任务的地方。 Best, Faaron Zheng 在2020年09月04日 17:04,LittleFall 写道:
我有一个 sql 文件,它里面有不少 flink sql 的创建表的语句和查询语句,现在我想要通过 sql client
提交这些任务,却只能一句一句的复制粘贴。如果同时复制粘贴多条语句就会报错,在 flink sql client 中使用 source xxx.sql
也会报错。 请问用什么样的方
换句话说,写进kafka的数据是同一个用户的两条,余额分别是0和100,就是我之前那个计算过程,这个计算操作在Oracle完成,我只负责把余额,也就是0或者100更新到kudu对应的余额字段,因为消费数据可能会乱序,先更新100,再更新0,这样导致存在kudu的数据是0,正确的数据最终应该是100
---原始邮件---
发件人: "wwj"
hi, all??
DataStream APIkafka??DataStream ds1??
tableEnvhive catalog??
tableEnv.registerCatalog(catalogName, catalog);
tableEnv.useCatalog(catalogName);
??ds1??table
Table sourcetable = tableEnv.fromDataStrea
??
??StreamTableEnvironment.from("")
??package kafka;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org
两个方法
1. kafka 里面可以 keyby, partition 里面都是有序的, 所以每个用户处理都是有序的
2. 就是你说的在 flink 里面做乱序处理
宁吉浩 于2020年9月4日周五 下午5:56写道:
> 我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格;
> 我的解决办法是把迟到数据丢弃,然后进行业务计算;
> 另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算;
> 之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次;
>
>
> -
Hi, all
当指定partition的时候这个问题通过path 也没法解决了
CREATE TABLE MyUserTable (
column_name1 INT,
column_name2 STRING, dt string,) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem', -- required: specify the connector
'path' = 'file:///path/to/whatever', -- required: path to a director
想尝试采用flink-cdc 来清洗数据,但是尝试下来,困于几点:
① 使用sql-client 开启checkpoint ,如果程序挂掉,该如何接着checkpoint,继续执行相应程序。尤其是在执行group
by或者是count等操作时该如何办?
② 如果以上方式不行,是否可以采用写代码的形式,重启时指定checkpoint,但还是采用flink-cdc的方式去消费
| |
引领
|
|
yrx73...@163.com
|
签名由网易邮箱大师定制
hi, all
我这边用flink sql client 创建表的时候
CREATE TABLE MyUserTable (
column_name1 INT,
column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
'connector' = 'filesystem', -- required: specify the connector
'path' = 'file:///path/to/whatever', -- required: path to a direct
我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格;
我的解决办法是把迟到数据丢弃,然后进行业务计算;
另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算;
之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次;
--
发件人:smq <374060...@qq.com>
发送时间:2020年9月4日(星期五) 17:35
收件人:wwj ; user-zh
主 题:回复:消费kafka数据乱序问
??
Flink+drools drools
2020-9-4
| |
|
|
hold_li...@163.com
|
??
??2020??8??6?? 10:26??samuel@ubtrobot.com ??
flink
,??
可否发下是哪个配置,有相关的文档吗?
superainbower 于2020年9月4日周五 下午5:24写道:
> 1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题
>
>
> | |
> superainbower
> |
> |
> superainbo...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年09月4日 15:11,taochanglian 写道:
> 确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数
1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题
| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制
在2020年09月4日 15:11,taochanglian 写道:
确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。
举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key
hash,数据只进入1个partition,就不会触发计算。
我有一个 sql 文件,它里面有不少 flink sql 的创建表的语句和查询语句,现在我想要通过 sql client
提交这些任务,却只能一句一句的复制粘贴。如果同时复制粘贴多条语句就会报错,在 flink sql client 中使用 source xxx.sql
也会报错。
请问用什么样的方法可以一次性执行多条语句呢?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
大家好
现在碰到一个关于处理乱序的问题,业务场景是银行余额的更新,数据源是kafka,有一个账户余额字段,sink到kudu,更新客户余额.
如果对于同一个账户的多笔操作出现乱序,可能会导致客户余额不对。比如一个客户账户有100块,先消费100,然后存100,按照正常的数据处理顺序,帐户余额应该还是100的,假如说先处理了存钱,存上之后余额是100-100+100=100,然后处理消费100的那条数据,由于消费在前,这时这条数据对应的余额是100-100=0。
这样的话就出现了统计余额错误,请问想要按照事件时间处理数据是不是可以先keyby,然后用watermark.
Hi,
推荐你使用ddl来声明你上下游用的connector
```
table_env.execute_sql("""
CREATE TABLE output (
data STRING ARRAY
) WITH (
'connector' = 'filesystem', -- required: specify the connector
'path' = 'file:///tmp/test.csv', -- required: path to a directory
'format' = 'json',
'json.fail-on-missing-fi
您好,我是想让输出insert_into到目标表中,具体如下:
st_env=StreamExecutionEnvironment.get_execution_environment()
st_env.connect了一个source table(table包含a字段),
然后
| st_env.connect(FileSystem().path('tmp')) \ |
| | .with_format(OldCsv() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
| | .with_schema(Schema(
您好,我是想让输出insert_into到目标表中,具体如下:
st_env=StreamExecutionEnvironment.get_execution_environment()
st_env.connect了一个source table(table包含a字段),
然后
| st_env.connect(FileSystem().path('tmp')) \ |
| | .with_format(OldCsv() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
| | .with_schema(Schema(
val tableConfig = tableEnv.getConfig.getConfiguration
tableConfig.setString("table.exec.resource.default-parallelism","4")
已经加了table的并行度设置,但是提示小于104并行度不让执行
Vertex Source: HiveTableSource()'s parallelism (104) is higher than the max
parallelism (4). Please lower the parallelism or increase the
我的代码中使用flinksql,本机idea中测试没有问题,放到集群上跑之后,占用了全部40个slots,并且并行度为114,程序也一直卡着没有输出,是一个加入了时间窗口的flink
sql
我再代码中全局设置了,dataStreamEnv.setParallelism(4)
dataStreamEnv.setMaxParallelism(4)
但是感觉完全不起作用,请问怎么去限制flink sql的并行度?
Hi,
你是调试的时候想看结果吗?
你可以直接table.to_pandas()来看结果,或者用print connector来看。
个人觉得to_pandas最简单,比如你可以试试下面的例子
```
table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
@udf(input_types=DataTypes.STRING(),
result_type=DataTypes.ARRAY(DataTypes.STRING()))
def func(a):
return np.array([a, a,
确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。
举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key
hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。
在 2020/9/4 13:14, Benchao Li 写道:
如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
要处理这种情况,可以了解下idle
23 matches
Mail list logo