Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-04 Thread Cristian
My suspicion is that somewhere in the path were it fails to connect yo zookeeper, the exception is swallowed, so instead of running the shutdown path for when the job fails, the general shutdown path is taken. This was fortunately a job for which we had a savepoint from yesterday. Otherwise

Re: Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-04 Thread Qingdong Zeng
Hi Cristian, In the log,we can see it went to the method shutDownAsync(applicationStatus,null,true); `` 2020-09-04 17:32:07,950 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting StandaloneApplicationClusterEntryPoint down

?????? ????kafka????????????

2020-09-04 Thread ????????
??100,0100??100 0,100??(??100)??? ---- ??:

回复:关于flink sql并行度问题的请教

2020-09-04 Thread faaron zheng
Hi, HiveTableSource默认会根据数据大小自行分配并发,所以和你设置的最大并发冲突了,你可以设置table. exec. hive. infer-source-parallelism: false来关闭这个功能 Best, Faaron Zheng 在2020年09月04日 15:29,me 写道: val tableConfig = tableEnv.getConfig.getConfiguration tableConfig.setString("table.exec.resource.default-parallelism","4")

回复:flink sql client 如何同时执行多条 sql 语句

2020-09-04 Thread faaron zheng
Hi, sql-client目前应该是没有这个能力的,它是交互式执行的,我们之前在sql-client的基础上改过一个类似beeline -e/-f的脚本,主要修改的提交任务的地方。 Best, Faaron Zheng 在2020年09月04日 17:04,LittleFall 写道: 我有一个 sql 文件,它里面有不少 flink sql 的创建表的语句和查询语句,现在我想要通过 sql client 提交这些任务,却只能一句一句的复制粘贴。如果同时复制粘贴多条语句就会报错,在 flink sql client 中使用 source xxx.sql 也会报错。

Checkpoint metadata deleted by Flink after ZK connection issues

2020-09-04 Thread Cristian
Hello guys. We run a stand-alone cluster that runs a single job (if you are familiar with the way Ververica Platform runs Flink jobs, we use a very similar approach). It runs Flink 1.11.1 straight from the official docker image. Usually, when our jobs crash for any reason, they will resume

FLINK DATASTREAM Processing Question

2020-09-04 Thread Vijayendra Yadav
Hi Team, I have a generic Question. Let's say I have 2 Actions to be taken on Flink DATASTREAM (Kafka). 1) Convert some data fields, and write to external Database 2) Transform #1 converted data fields in to different record format say AVRO *Here are Two approaches that are possible.* a) One

Speeding up CoGroup in batch job

2020-09-04 Thread Ken Krugler
Hi all, I added a CoGroup to my batch job, and it’s now running much slower, primarily due to back pressure from the CoGroup operator. I assume it’s because this operator is having to sort/buffer-to-disk all incoming data. Looks like about 1TB from one side of the join, currently very little

Re: Flink 1.11 TaskManagerLogFileHandler -Exception

2020-09-04 Thread Yun Tang
Hi Could you check the log4j.properties or related conf file used to generate logs to see anything unpected? You could also login the machine and use command 'ps -ef | grep java' to grep the command to run taskmanager, as Flink would print the place where 'taskmanager.log' locates [1] via

Re: FLINK YARN SHIP from S3 Directory

2020-09-04 Thread Vijayendra Yadav
Thank You Gordon. I am still on 1.10.0. I will try 1.11.0 When I get there. Regards, Vijay On Fri, Sep 4, 2020 at 12:52 AM Tzu-Li (Gordon) Tai wrote: > Hi, > > As far as I can tell from a recent change [1], this seems to be possible > now starting from Flink 1.11.x. Have you already tried this

Re: Unit Test for KeyedProcessFunction with out-of-core state

2020-09-04 Thread Alexey Trenikhun
Hi Gordon, We already use [1]. Unfortunately it doesn’t allow to detect out-of-core specific bugs like this: POJO v = myMapState.get(myKey): v.setStatus(1); return; // missing myMapState.put(myKey, v); Thanks, Alexey From: Tzu-Li (Gordon) Tai Sent: Friday,

Re: Flink 1.11 TaskManagerLogFileHandler -Exception

2020-09-04 Thread aj
I have also checked for port and all the ports from 0-65535 are open. Even I do not see any taskmanager.log is getting generated under my container logs on the task machine. On Fri, Sep 4, 2020 at 2:58 PM aj wrote: > > I am trying to switch to Flink 1.11 with the new EMR release 6.1. I have >

回复:消费kafka数据乱序问题

2020-09-04 Thread smq
换句话说,写进kafka的数据是同一个用户的两条,余额分别是0和100,就是我之前那个计算过程,这个计算操作在Oracle完成,我只负责把余额,也就是0或者100更新到kudu对应的余额字段,因为消费数据可能会乱序,先更新100,再更新0,这样导致存在kudu的数据是0,正确的数据最终应该是100 ---原始邮件--- 发件人: "wwj"

Re: Job Manager taking long time to upload job graph on remote storage

2020-09-04 Thread Till Rohrmann
I am not sure at this point that the delay is caused by Flink. I would rather suspect that it has something to do with an external system. Maybe you could try profiling the job submission so that we see clearer where the time is spent. Other than that, there might be some options for the GCS

flink ???? StreamingFileSink ??catalog??????????????

2020-09-04 Thread MuChen
hi, all?? DataStream APIkafka??DataStream ds1?? tableEnvhive catalog?? tableEnv.registerCatalog(catalogName, catalog); tableEnv.useCatalog(catalogName); ??ds1??table Table sourcetable =

1.11????????????????????????????????????????????????????????????????

2020-09-04 Thread Asahi Lee
?? ??StreamTableEnvironment.from("") ??package kafka; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import

Re: 消费kafka数据乱序问题

2020-09-04 Thread Xiao Xu
两个方法 1. kafka 里面可以 keyby, partition 里面都是有序的, 所以每个用户处理都是有序的 2. 就是你说的在 flink 里面做乱序处理 宁吉浩 于2020年9月4日周五 下午5:56写道: > 我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格; > 我的解决办法是把迟到数据丢弃,然后进行业务计算; > 另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算; > 之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次; > > >

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-04 Thread Peihui He
Hi, all 当指定partition的时候这个问题通过path 也没法解决了 CREATE TABLE MyUserTable ( column_name1 INT, column_name2 STRING, dt string,) PARTITIONED BY (dt) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///path/to/whatever', -- required: path to a

Re: Job Manager taking long time to upload job graph on remote storage

2020-09-04 Thread Prakhar Mathur
Yes, we can try the same in 1.11. Meanwhile is there any network or threads related config that we can tweak for this? On Fri, Sep 4, 2020 at 12:48 PM Till Rohrmann wrote: > From the log snippet it is hard to tell. Flink is not only interacting > with GCS but also with ZooKeeper to store a

sql-client checkpoint sql-client

2020-09-04 Thread 引领
想尝试采用flink-cdc 来清洗数据,但是尝试下来,困于几点: ① 使用sql-client 开启checkpoint ,如果程序挂掉,该如何接着checkpoint,继续执行相应程序。尤其是在执行group by或者是count等操作时该如何办? ② 如果以上方式不行,是否可以采用写代码的形式,重启时指定checkpoint,但还是采用flink-cdc的方式去消费 | | 引领 | | yrx73...@163.com | 签名由网易邮箱大师定制

flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-04 Thread Peihui He
hi, all 我这边用flink sql client 创建表的时候 CREATE TABLE MyUserTable ( column_name1 INT, column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///path/to/whatever', -- required: path to a

回复:消费kafka数据乱序问题

2020-09-04 Thread 宁吉浩
我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格; 我的解决办法是把迟到数据丢弃,然后进行业务计算; 另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算; 之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次; -- 发件人:smq <374060...@qq.com> 发送时间:2020年9月4日(星期五) 17:35 收件人:wwj ; user-zh 主 

??????????????flink??????????????????

2020-09-04 Thread ????
?? Flink+drools drools 2020-9-4 | | | | hold_li...@163.com | ?? ??2020??8??6?? 10:26??samuel@ubtrobot.com ?? flink ,??

Re: 请指教一个关于时间窗的问题,非常感谢!

2020-09-04 Thread zilong xiao
可否发下是哪个配置,有相关的文档吗? superainbower 于2020年9月4日周五 下午5:24写道: > 1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题 > > > | | > superainbower > | > | > superainbo...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年09月4日 15:11,taochanglian 写道: >

Flink 1.11 TaskManagerLogFileHandler -Exception

2020-09-04 Thread aj
I am trying to switch to Flink 1.11 with the new EMR release 6.1. I have created 3 nodes EMR cluster with Flink 1.11. When I am running my job its working fine only issue is I am not able to see any logs in the job manager and task manager. I am seeing below exception in stdout of job manager

回复: 请指教一个关于时间窗的问题,非常感谢!

2020-09-04 Thread superainbower
1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题 | | superainbower | | superainbo...@163.com | 签名由网易邮箱大师定制 在2020年09月4日 15:11,taochanglian 写道: 确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key

flink sql client 如何同时执行多条 sql 语句

2020-09-04 Thread LittleFall
我有一个 sql 文件,它里面有不少 flink sql 的创建表的语句和查询语句,现在我想要通过 sql client 提交这些任务,却只能一句一句的复制粘贴。如果同时复制粘贴多条语句就会报错,在 flink sql client 中使用 source xxx.sql 也会报错。 请问用什么样的方法可以一次性执行多条语句呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/

消费kafka数据乱序问题

2020-09-04 Thread smq
大家好 现在碰到一个关于处理乱序的问题,业务场景是银行余额的更新,数据源是kafka,有一个账户余额字段,sink到kudu,更新客户余额. 如果对于同一个账户的多笔操作出现乱序,可能会导致客户余额不对。比如一个客户账户有100块,先消费100,然后存100,按照正常的数据处理顺序,帐户余额应该还是100的,假如说先处理了存钱,存上之后余额是100-100+100=100,然后处理消费100的那条数据,由于消费在前,这时这条数据对应的余额是100-100=0。 这样的话就出现了统计余额错误,请问想要按照事件时间处理数据是不是可以先keyby,然后用watermark.

Re: State Storage Questions

2020-09-04 Thread Tzu-Li (Gordon) Tai
Hi, On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley wrote: > Hello! > > I've been digging into State Storage documentation, but it's left me > scratching my head with a few questions. Any help will be much appreciated. > > Qs: > 1. Is there a way to use RocksDB state backend for Flink on AWS EMR? >

Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 Thread Xingbo Huang
Hi, 推荐你使用ddl来声明你上下游用的connector ``` table_env.execute_sql(""" CREATE TABLE output ( data STRING ARRAY ) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///tmp/test.csv', -- required: path to a directory 'format' = 'json',

Re:Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 Thread whh_960101
您好,我是想让输出insert_into到目标表中,具体如下: st_env=StreamExecutionEnvironment.get_execution_environment() st_env.connect了一个source table(table包含a字段), 然后 | st_env.connect(FileSystem().path('tmp')) \ | | | .with_format(OldCsv() | | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ | | |

Re: FLINK YARN SHIP from S3 Directory

2020-09-04 Thread Tzu-Li (Gordon) Tai
Hi, As far as I can tell from a recent change [1], this seems to be possible now starting from Flink 1.11.x. Have you already tried this with the latest Flink version? Also including Klou in this email, who might be able to confirm this. Cheers, Gordon [1]

Re:Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 Thread whh_960101
您好,我是想让输出insert_into到目标表中,具体如下: st_env=StreamExecutionEnvironment.get_execution_environment() st_env.connect了一个source table(table包含a字段), 然后 | st_env.connect(FileSystem().path('tmp')) \ | | | .with_format(OldCsv() | | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ | | |

Re: Unit Test for KeyedProcessFunction with out-of-core state

2020-09-04 Thread Tzu-Li (Gordon) Tai
Hi Alexey, Is there a specific reason why you want to test against RocksDB? Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness` [1] that allows you to wrap a user function and eliminate the need to worry about setting up heavy runtime context / dependencies such as the

回复:关于flink sql并行度问题的请教

2020-09-04 Thread me
val tableConfig = tableEnv.getConfig.getConfiguration tableConfig.setString("table.exec.resource.default-parallelism","4") 已经加了table的并行度设置,但是提示小于104并行度不让执行 Vertex Source: HiveTableSource()'s parallelism (104) is higher than the max parallelism (4). Please lower the parallelism or increase

Re: Fail to deploy Flink on minikube

2020-09-04 Thread Till Rohrmann
Great to hear that it works on K8s and letting us know that the problem is likely to be caused by Minikube. Cheers, Till On Fri, Sep 4, 2020 at 8:53 AM superainbower wrote: > Hi Till & Yang, > I can deploy Flink on kubernetes(not minikube), it works well > So there are some problem about my

关于flink sql并行度问题的请教

2020-09-04 Thread me
我的代码中使用flinksql,本机idea中测试没有问题,放到集群上跑之后,占用了全部40个slots,并且并行度为114,程序也一直卡着没有输出,是一个加入了时间窗口的flink sql 我再代码中全局设置了,dataStreamEnv.setParallelism(4) dataStreamEnv.setMaxParallelism(4) 但是感觉完全不起作用,请问怎么去限制flink sql的并行度?

Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 Thread Xingbo Huang
Hi, 你是调试的时候想看结果吗? 你可以直接table.to_pandas()来看结果,或者用print connector来看。 个人觉得to_pandas最简单,比如你可以试试下面的例子 ``` table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b']) @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ARRAY(DataTypes.STRING())) def func(a): return np.array([a,

Re: 回复:请指教一个关于时间窗的问题,非常感谢!

2020-09-04 Thread taochanglian
确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。 在 2020/9/4 13:14, Benchao Li 写道: 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。

?????? ????savepoint

2020-09-04 Thread x
---- ??: "x"

Re:Re: Re: pyflink-udf 问题反馈

2020-09-04 Thread whh_960101
您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗 我的udf输出了一个numpy.array(dtype = str), result_type设的是DataTypes.ARRAY(DataTypes.STRING()) 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING())) 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容 请问这个问题该怎么解决?

Re: Re:Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据

2020-09-04 Thread bradyMk
好的,谢谢您,我看一下~ - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/

FlinkSQL如何处理上游的表结构变更

2020-09-04 Thread xiao cai
Hi all: flink version : 1.11.0 场景:上游的数据来自binlog,当发生表结构变更时,希望能够实时的变动flink内部表的schema,但是目前来看,表的schema都是create table时写死的,有什么办法可以处理这种场景呢

?????? FlinkKafkaConsumer????

2020-09-04 Thread op
---- ??: "user-zh"