My suspicion is that somewhere in the path were it fails to connect yo
zookeeper, the exception is swallowed, so instead of running the shutdown path
for when the job fails, the general shutdown path is taken.
This was fortunately a job for which we had a savepoint from yesterday.
Otherwise
Hi Cristian,
In the log,we can see it went to the method
shutDownAsync(applicationStatus,null,true);
``
2020-09-04 17:32:07,950 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting
StandaloneApplicationClusterEntryPoint down
??100,0100??100
0,100??(??100)???
----
??:
Hi, HiveTableSource默认会根据数据大小自行分配并发,所以和你设置的最大并发冲突了,你可以设置table. exec. hive.
infer-source-parallelism: false来关闭这个功能 Best, Faaron Zheng 在2020年09月04日 15:29,me
写道: val tableConfig = tableEnv.getConfig.getConfiguration
tableConfig.setString("table.exec.resource.default-parallelism","4")
Hi, sql-client目前应该是没有这个能力的,它是交互式执行的,我们之前在sql-client的基础上改过一个类似beeline
-e/-f的脚本,主要修改的提交任务的地方。 Best, Faaron Zheng 在2020年09月04日 17:04,LittleFall 写道:
我有一个 sql 文件,它里面有不少 flink sql 的创建表的语句和查询语句,现在我想要通过 sql client
提交这些任务,却只能一句一句的复制粘贴。如果同时复制粘贴多条语句就会报错,在 flink sql client 中使用 source xxx.sql
也会报错。
Hello guys.
We run a stand-alone cluster that runs a single job (if you are familiar with
the way Ververica Platform runs Flink jobs, we use a very similar approach). It
runs Flink 1.11.1 straight from the official docker image.
Usually, when our jobs crash for any reason, they will resume
Hi Team,
I have a generic Question.
Let's say I have 2 Actions to be taken on Flink DATASTREAM (Kafka).
1) Convert some data fields, and write to external Database
2) Transform #1 converted data fields in to different record format say AVRO
*Here are Two approaches that are possible.*
a) One
Hi all,
I added a CoGroup to my batch job, and it’s now running much slower, primarily
due to back pressure from the CoGroup operator.
I assume it’s because this operator is having to sort/buffer-to-disk all
incoming data. Looks like about 1TB from one side of the join, currently very
little
Hi
Could you check the log4j.properties or related conf file used to generate logs
to see anything unpected?
You could also login the machine and use command 'ps -ef | grep java' to grep
the command to run taskmanager, as Flink would print the place where
'taskmanager.log' locates [1] via
Thank You Gordon. I am still on 1.10.0. I will try 1.11.0 When I get there.
Regards,
Vijay
On Fri, Sep 4, 2020 at 12:52 AM Tzu-Li (Gordon) Tai
wrote:
> Hi,
>
> As far as I can tell from a recent change [1], this seems to be possible
> now starting from Flink 1.11.x. Have you already tried this
Hi Gordon,
We already use [1]. Unfortunately it doesn’t allow to detect out-of-core
specific bugs like this:
POJO v = myMapState.get(myKey):
v.setStatus(1);
return;
// missing myMapState.put(myKey, v);
Thanks,
Alexey
From: Tzu-Li (Gordon) Tai
Sent: Friday,
I have also checked for port and all the ports from 0-65535 are open. Even
I do not see any taskmanager.log is getting generated under my container
logs on the task machine.
On Fri, Sep 4, 2020 at 2:58 PM aj wrote:
>
> I am trying to switch to Flink 1.11 with the new EMR release 6.1. I have
>
换句话说,写进kafka的数据是同一个用户的两条,余额分别是0和100,就是我之前那个计算过程,这个计算操作在Oracle完成,我只负责把余额,也就是0或者100更新到kudu对应的余额字段,因为消费数据可能会乱序,先更新100,再更新0,这样导致存在kudu的数据是0,正确的数据最终应该是100
---原始邮件---
发件人: "wwj"
I am not sure at this point that the delay is caused by Flink. I would
rather suspect that it has something to do with an external system. Maybe
you could try profiling the job submission so that we see clearer where the
time is spent. Other than that, there might be some options for the GCS
hi, all??
DataStream APIkafka??DataStream ds1??
tableEnvhive catalog??
tableEnv.registerCatalog(catalogName, catalog);
tableEnv.useCatalog(catalogName);
??ds1??table
Table sourcetable =
??
??StreamTableEnvironment.from("")
??package kafka;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
两个方法
1. kafka 里面可以 keyby, partition 里面都是有序的, 所以每个用户处理都是有序的
2. 就是你说的在 flink 里面做乱序处理
宁吉浩 于2020年9月4日周五 下午5:56写道:
> 我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格;
> 我的解决办法是把迟到数据丢弃,然后进行业务计算;
> 另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算;
> 之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次;
>
>
>
Hi, all
当指定partition的时候这个问题通过path 也没法解决了
CREATE TABLE MyUserTable (
column_name1 INT,
column_name2 STRING, dt string,) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem', -- required: specify the connector
'path' = 'file:///path/to/whatever', -- required: path to a
Yes, we can try the same in 1.11. Meanwhile is there any network or threads
related config that we can tweak for this?
On Fri, Sep 4, 2020 at 12:48 PM Till Rohrmann wrote:
> From the log snippet it is hard to tell. Flink is not only interacting
> with GCS but also with ZooKeeper to store a
想尝试采用flink-cdc 来清洗数据,但是尝试下来,困于几点:
① 使用sql-client 开启checkpoint ,如果程序挂掉,该如何接着checkpoint,继续执行相应程序。尤其是在执行group
by或者是count等操作时该如何办?
② 如果以上方式不行,是否可以采用写代码的形式,重启时指定checkpoint,但还是采用flink-cdc的方式去消费
| |
引领
|
|
yrx73...@163.com
|
签名由网易邮箱大师定制
hi, all
我这边用flink sql client 创建表的时候
CREATE TABLE MyUserTable (
column_name1 INT,
column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
'connector' = 'filesystem', -- required: specify the connector
'path' = 'file:///path/to/whatever', -- required: path to a
我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格;
我的解决办法是把迟到数据丢弃,然后进行业务计算;
另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算;
之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次;
--
发件人:smq <374060...@qq.com>
发送时间:2020年9月4日(星期五) 17:35
收件人:wwj ; user-zh
主
??
Flink+drools drools
2020-9-4
| |
|
|
hold_li...@163.com
|
??
??2020??8??6?? 10:26??samuel@ubtrobot.com ??
flink
,??
可否发下是哪个配置,有相关的文档吗?
superainbower 于2020年9月4日周五 下午5:24写道:
> 1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题
>
>
> | |
> superainbower
> |
> |
> superainbo...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年09月4日 15:11,taochanglian 写道:
>
I am trying to switch to Flink 1.11 with the new EMR release 6.1. I have
created 3 nodes EMR cluster with Flink 1.11. When I am running my job its
working fine only issue is I am not able to see any logs in the job manager
and task manager. I am seeing below exception in stdout of job manager
1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题
| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制
在2020年09月4日 15:11,taochanglian 写道:
确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。
举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key
我有一个 sql 文件,它里面有不少 flink sql 的创建表的语句和查询语句,现在我想要通过 sql client
提交这些任务,却只能一句一句的复制粘贴。如果同时复制粘贴多条语句就会报错,在 flink sql client 中使用 source xxx.sql
也会报错。
请问用什么样的方法可以一次性执行多条语句呢?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
大家好
现在碰到一个关于处理乱序的问题,业务场景是银行余额的更新,数据源是kafka,有一个账户余额字段,sink到kudu,更新客户余额.
如果对于同一个账户的多笔操作出现乱序,可能会导致客户余额不对。比如一个客户账户有100块,先消费100,然后存100,按照正常的数据处理顺序,帐户余额应该还是100的,假如说先处理了存钱,存上之后余额是100-100+100=100,然后处理消费100的那条数据,由于消费在前,这时这条数据对应的余额是100-100=0。
这样的话就出现了统计余额错误,请问想要按照事件时间处理数据是不是可以先keyby,然后用watermark.
Hi,
On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley wrote:
> Hello!
>
> I've been digging into State Storage documentation, but it's left me
> scratching my head with a few questions. Any help will be much appreciated.
>
> Qs:
> 1. Is there a way to use RocksDB state backend for Flink on AWS EMR?
>
Hi,
推荐你使用ddl来声明你上下游用的connector
```
table_env.execute_sql("""
CREATE TABLE output (
data STRING ARRAY
) WITH (
'connector' = 'filesystem', -- required: specify the connector
'path' = 'file:///tmp/test.csv', -- required: path to a directory
'format' = 'json',
您好,我是想让输出insert_into到目标表中,具体如下:
st_env=StreamExecutionEnvironment.get_execution_environment()
st_env.connect了一个source table(table包含a字段),
然后
| st_env.connect(FileSystem().path('tmp')) \ |
| | .with_format(OldCsv() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
| |
Hi,
As far as I can tell from a recent change [1], this seems to be possible
now starting from Flink 1.11.x. Have you already tried this with the latest
Flink version?
Also including Klou in this email, who might be able to confirm this.
Cheers,
Gordon
[1]
您好,我是想让输出insert_into到目标表中,具体如下:
st_env=StreamExecutionEnvironment.get_execution_environment()
st_env.connect了一个source table(table包含a字段),
然后
| st_env.connect(FileSystem().path('tmp')) \ |
| | .with_format(OldCsv() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
| |
Hi Alexey,
Is there a specific reason why you want to test against RocksDB?
Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness`
[1] that allows you to wrap a user function and eliminate the need to worry
about setting up heavy runtime context / dependencies such as the
val tableConfig = tableEnv.getConfig.getConfiguration
tableConfig.setString("table.exec.resource.default-parallelism","4")
已经加了table的并行度设置,但是提示小于104并行度不让执行
Vertex Source: HiveTableSource()'s parallelism (104) is higher than the max
parallelism (4). Please lower the parallelism or increase
Great to hear that it works on K8s and letting us know that the problem is
likely to be caused by Minikube.
Cheers,
Till
On Fri, Sep 4, 2020 at 8:53 AM superainbower wrote:
> Hi Till & Yang,
> I can deploy Flink on kubernetes(not minikube), it works well
> So there are some problem about my
我的代码中使用flinksql,本机idea中测试没有问题,放到集群上跑之后,占用了全部40个slots,并且并行度为114,程序也一直卡着没有输出,是一个加入了时间窗口的flink
sql
我再代码中全局设置了,dataStreamEnv.setParallelism(4)
dataStreamEnv.setMaxParallelism(4)
但是感觉完全不起作用,请问怎么去限制flink sql的并行度?
Hi,
你是调试的时候想看结果吗?
你可以直接table.to_pandas()来看结果,或者用print connector来看。
个人觉得to_pandas最简单,比如你可以试试下面的例子
```
table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
@udf(input_types=DataTypes.STRING(),
result_type=DataTypes.ARRAY(DataTypes.STRING()))
def func(a):
return np.array([a,
确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。
举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key
hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。
在 2020/9/4 13:14, Benchao Li 写道:
如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
----
??:
"x"
您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
我的udf输出了一个numpy.array(dtype = str),
result_type设的是DataTypes.ARRAY(DataTypes.STRING())
把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
请问这个问题该怎么解决?
好的,谢谢您,我看一下~
-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi all:
flink version : 1.11.0
场景:上游的数据来自binlog,当发生表结构变更时,希望能够实时的变动flink内部表的schema,但是目前来看,表的schema都是create
table时写死的,有什么办法可以处理这种场景呢
----
??:
"user-zh"
44 matches
Mail list logo