Flink与MySQL对接相关的问题

2021-01-20 文章 Land
大家好!

使用Flink与MySQL对接的过程中,遇到如下的问题:

1.
MySQLCatalog目前未支持,对MySQL中的表进行读写需要先通过DDL进行声明,由于表中的字段比较多,操作起来会比较繁琐。搜查了一遍,发现目前社区对这个支持还没有列入计划。如果可以推动在1.13开发支持该Catalog,那就太好了。
https://issues.apache.org/jira/browse/FLINK-15352
https://issues.apache.org/jira/browse/FLINK-15350

2.不支持针对部分字段的插入更新;
https://issues.apache.org/jira/browse/FLINK-18726

3.大表查询的时候非常慢,例如几百万条记录的表,可能是过滤下推的问题;

4.float类型的字段在UPSERT模式时无法更新。这应该是一个bug。

MySQL使用的范围比较广,如果解决上面的问题,对接Flink将容易很多。
大家有没有一些建议推动一下?

参考:
https://cwiki.apache.org/confluence/display/FLINK/1.13+Release



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql 执行limit 很少的语句依然会暴增

2021-01-20 文章 zhang hao
请教个问题,使用flink sql 去拉取mysql数据,mysql源表有千万级别数据量,使用了 select * from sourcTable
limit 10;
即使是limit几条数据也会导致内存暴增。这里的limit是从mysql原表执行后 直接在flink taskmanager进行limit取数据吗?


Re: flink1.12 on yarn per-job 运行问题

2021-01-20 文章 chaos
感谢回复,看日志好像没什么异常。日志已添加到附件。
Yarn Sesseion 模式运行时是没问题的, Per-job 和 Application Mode 就不行。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-20 文章 Jark Wu
Great examples to understand the problem and the proposed changes, @Kurt!

Thanks Leonard for investigating this problem.
The time-zone problems around time functions and windows have bothered a
lot of users. It's time to fix them!

The return value changes sound reasonable to me, and keeping the return
type unchanged will minimize the surprise to the users.
Besides that, I think it would be better to mention how this affects the
window behaviors, and the interoperability with DataStream.

I think this definitely deserves a FLIP.



Hi zhisheng,

Do you have examples to illustrate which case will get the wrong window
boundaries?
That will help to verify whether the proposed changes can solve your
problem.

Best,
Jark


On Thu, 21 Jan 2021 at 12:54, zhisheng <173855...@qq.com> wrote:

> Thanks to Leonard Xu for discussing this tricky topic. At present, there
> are many Flink jobs in our production environment that are used to count
> day-level reports (eg: count PV/UV ).
>
>
> If use the default Flink SQL, the window time range of the
> statistics is incorrect, then the statistical results will naturally be
> incorrect.
>
>
> The user needs to deal with the time zone manually in order to solve the
> problem.
>
>
> If Flink itself can solve these time zone issues, then I think it will be
> user-friendly.
>
>
> Thank you
>
>
> Best!
> zhisheng
>
>
> --原始邮件--
> 发件人:
>   "dev"
> <
> xbjt...@gmail.com;
> 发送时间:2021年1月19日(星期二) 晚上6:35
> 收件人:"dev"
> 主题:Re: [DISCUSS] Correct time-related function behavior in Flink SQL
>
>
>
> I found above example format may mess up in different mail client, I post
> a picture here[1].
>
> Best,
> Leonard
>
> [1]
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png
> <
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png;
>
>
>  在 2021年1月19日,16:22,Leonard Xu  
>  Hi, all
> 
>  I want to start the discussion about correcting time-related function
> behavior in Flink SQL, this is a tricky topic but I think it’s time to
> address it.
> 
>  Currently some temporal function behaviors are wired to users.
>  1. When users use a PROCTIME() in SQL, the value of PROCTIME()
> has a timezone offset with the wall-clock time in users' local time zone,
> users need to add their local time zone offset manually to get expected
> local timestamp(e.g: Users in Germany need to +1h to get expected local
> timestamp).
> 
>  2. Users can not use
> CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP to get wall-clock
> timestamp in local time zone, and thus they need write UDF in their SQL
> just for implementing a simple filter like WHERE date_col =
> CURRENT_DATE.
> 
>  3. Another common case is the time window with day
> interval based on PROCTIME(), user plan to put all data from one day into
> the same window, but the window is assigned using timestamp in UTC+0
> timezone rather than the session timezone which leads to the window starts
> with an offset(e.g: Users in China need to add -8h in their business sql
> start and then +8h when output the result, the conversion like a magic for
> users).
> 
>  These problems come from that lots of time-related functions like
> PROCTIME(), NOW(), CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP are
> returning time values based on UTC+0 time zone.
> 
>  This topic will lead to a comparison of the three types, i.e.
> TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE, TIMESTAMP WITH LOCAL TIME ZONE and
> TIMESTAMP WITH TIME ZONE. In order to better understand the three types, I
> wrote a document[1] to help understand them better. You can also know the
> tree timestamp types behavior in Hadoop ecosystem from the reference link
> int the doc.
> 
> 
>  I Invested all Flink time-related functions current behavior and
> compared with other DB vendors like Pg,Presto, Hive, Spark,
> Snowflake, I made an excel [2] to organize them well, we can use it
> for the next discussion. Please let me know if I missed something.
>  From my investigation, I think we need to correct the behavior of
> function NOW()/PROCTIME()/CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP, to
> correct them, we can change the function return type or function return
> value or change return type and return value both. All of those way are
> valid because SQL:2011 does not specify the function return type and every
> SQL engine vendor has its own implementation. For example the
> CURRENT_TIMESTAMP function,
> 
>  FLINK  current behaviorexisted problem other vendors'
> behavior proposed change
>  CURRENT_TIMESTAMP  CURRENT_TIMESTAMP
>  TIMESTAMP(0) NOT NULL
> 
>  #session timezone: UTC
>  2020-12-28T23:52:52
> 
>  #session timezone: UTC+8
>  2020-12-28T23:52:52
> 
>  wall clock:
>  UTC+8: 

Re: flink1.12 on yarn per-job 运行问题

2021-01-20 文章 chaos
感谢回复。
CDH 集群 160G, 64C,平时主要夜间跑离线任务,提交程序时没指定任何资源相关的配置



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink整合hive问题

2021-01-20 文章 赵一旦
我将线上的hive-site文件复制到了flink的conf中,并且相关jar都放好。
使用flink的sql-client的-l方式将相关jar指定,启动了sql-client。

然后catalog,databases,tables等信息都能查询到了。

但是select * from xxTable where dt=''; 就会有问题。
看了flink集群报错,这个错误直接导致flink的standalonesession进程失败会。
报错如下:


2021-01-21 13:43:42,818 INFO  org.apache.hadoop.fs.bos.BaiduBosFileSystem
   [] - re-open at specific locaition: 0
...skipping...
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.12.0.jar:1.12.0]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.12.0.jar:1.12.0]
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the
operator coordinators
at
org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
~[?:1.8.0_251]
... 27 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at
org.apache.hadoop.hive.common.ValidReadTxnList.readFromString(ValidReadTxnList.java:142)
~[?:?]
at
org.apache.hadoop.hive.common.ValidReadTxnList.(ValidReadTxnList.java:57)
~[?:?]
at
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$Context.(OrcInputFormat.java:421)
~[?:?]
at
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:983)
~[?:?]
at
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
~[?:?]
at
org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:86)
~[?:?]
at
org.apache.flink.connectors.hive.HiveSourceFileEnumerator.enumerateSplits(HiveSourceFileEnumerator.java:57)
~[?:?]
at
org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:140)
~[flink-table_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.connectors.hive.HiveSource.createEnumerator(HiveSource.java:115)
~[?:?]
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:119)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:308)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:72)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:182)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1094)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
~[?:1.8.0_251]
... 27 more


Re: 回复:Flink 1.11 SQL可以支持kafka动态分区发现么?

2021-01-20 文章 zhisheng
1.11 文档里面没有,那么估计就不支持了,可以看下 1.12 如何实现的,然后把这个 patch 打在内部自己的 flink 版本里面

sunfulin  于2021年1月20日周三 下午2:53写道:

>
> 我看下这个源码,貌似是有这个参数。还不确定SQL ddl里配置会不会生效,回头验证下。
>
>
>
>
> --
> 发自我的网易邮箱手机智能版
> 
>
>
> - Original Message -
> From: "Shuai Xia" 
> To: user-zh , sunfulin0321  >
> Sent: Wed, 20 Jan 2021 14:42:36 +0800
> Subject: 回复:Flink 1.11 SQL可以支持kafka动态分区发现么?
>
> Hi,看下FlinkKafkaConsumerBase内有没有使用,有的话就是支持的
>
>
> --
> 发件人:sunfulin 
> 发送时间:2021年1月20日(星期三) 14:40
> 收件人:user-zh 
> 主 题:Flink 1.11 SQL可以支持kafka动态分区发现么?
>
>
> hi,
>
> 各位大神,请教下,1.11的sql作业,如何能实现动态分区发现呐?我在1.12的文档里发现有个参数可以设置,但是1.11的版本里貌似没有。想确认下能否支持?
>
>
>
>
> --
> 发自我的网易邮箱手机智能版


Re: flink heartbeat timeout

2021-01-20 文章 Xintong Song
1. 50s 的 timeout 时间通常应该是够用的。建议排查一下 timeout 当时环境中是否存在网络抖动,或者 JM/TM 进程是否存在长时间
GC 导致不响应。
2. 目前 flink 集群配置无法做到不重启热更新

Thank you~

Xintong Song



On Thu, Jan 21, 2021 at 11:39 AM guoxb__...@sina.com 
wrote:

> Hi
>
> *问题描述:*
>
>  
> 我在使用flink进行流式计算任务,我的程序造昨晚上21点启动的,当时看是正常的,数据也是正常处理的,在今早9点时候查看,任务被自动重启了,查看日志,报错如下:
>
> 从报错上来看是由于超时时间引起的,查看资料,是需要调整该参数参数:
> heartbeat.timeout,官网文档支出默认值是5,但是这样以来的话,就需要重启flink服务了,这在我们生产上是不允许的。
>
> *问题:*
> 1、该错误的原因目前只是经过猜测,还没有确定具体的问题,希望有经验的朋友指点一二,万分感谢
> 2、如果我真的需要设置heartbeat.timeout这个参数的话,如何在不通过重启flink集群的方式来实现,万分感谢
> 说明:
> 我的flink版本是:1.11.0
> --
> guoxb__...@sina.com
>


Re: Pyflink JVM Metaspace 内存泄漏定位

2021-01-20 文章 Xintong Song
cc @Jark
看起来像是 JDBC connector 的问题。这块你熟悉吗?或者知道谁比较熟悉吗?

Thank you~

Xintong Song



On Wed, Jan 20, 2021 at 8:07 PM YueKun  wrote:

> hi,不确定是否能看到图片,Jmap导出的数据分析看如下:<
> http://apache-flink.147419.n8.nabble.com/file/t1276/WX20210120-191436.png>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?

2021-01-20 文章 HunterXHunter
CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink ddl sql 在 Test和在Main里面执行结果不同

2021-01-20 文章 HunterXHunter
同一段代码,在main里面可以正常正常,在Test里面却直接结束

StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);

bsTableEnv.executeSql(
DDLSourceSQLManager.createStreamFromKafka("localhost:9092",
"test",
"test",
"test",
"json"));
   
bsTableEnv.executeSql(com.ddlsql.DDLSourceSQLManager.createDynamicPrintlnRetractSinkTbl("printlnRetractSink"));
bsTableEnv.executeSql("insert into printlnRetractSink select
msg,count(*) as cnt from test group by msg");



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 设置状态存储位置后,job运行起来后找不到状态数据

2021-01-20 文章 zhisheng
你配置的是本地目录,不是 hdfs
目录,当重启后,可能新的任务运行的机器不是之前的那台机器了,那么之前作业的状态信息(在其他机器上)是不在新的机器上的,那么就会发现找不到状态文件,建议配置成
HDFS 的

Best
zhisheng

刘海  于2021年1月20日周三 下午9:05写道:

> Hi all
> 小弟遇到个问题期望大佬解答解答:
> 通过 env.setStateBackend(new
> RocksDBStateBackend("file:///data/flink/checkpoints"));设置状态存储位置,job运行起来后找不到状态数据,
>
>
> flink1.12 yarn pre job 模式,下面是我的配置,job运行起来后在服务器上找不到
> “/data/flink/checkpoints”这个目录,像我设置了状态的存储位置是不是job一运行起来对应的存储位置就应该有状态的数据呢?
>
>
> public class FlinkTestDemo {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(6);
> env.getConfig().setAutoWatermarkInterval(200);
> env.setStateBackend(new
> RocksDBStateBackend("file:///data/flink/checkpoints"));
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env,
> bsSettings);
>
> bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE);
> CheckpointConfig config = env.getCheckpointConfig();
>
> config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofMinutes(5));
>
> Configuration configuration = bsTableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.mini-batch.enabled", "true");
> configuration.setString("table.exec.mini-batch.allow-latency", "6000");
> configuration.setString("table.exec.mini-batch.size", "5000");
>
> | |
> 刘海
> |
> |
> liuha...@163.com
> |
> 签名由网易邮箱大师定制


Re: flink1.12 on yarn per-job 运行问题

2021-01-20 文章 zhisheng
应该要提供一下 jm 的日志,麻烦检查一下 jm 里面的日志是否有异常的日志,我们遇到过类似的问题是因为包冲突导致的作业申请资源有问题,最后一直处于
created 状态

Best
zhisheng

花乞丐  于2021年1月21日周四 上午8:47写道:

> 贴一下提交程序的参数,以及你机器的配置,从上面看,是资源分配不够!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Pyflink 提交 Batch 任务后报错 “Failed to execute sql”

2021-01-20 文章 YueKun
结束这个问题,目前没有再出现,可能是以下两个原因的一个:
1. 更新了 Flink 版本,从 1.12.0 更新至 1.12.1
2. 扩大了Task Off-Heap 空间



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-20 文章 Kurt Young
cc this to user & user-zh mailing list because this will affect lots of
users, and also quite a lot of users
were asking questions around this topic.

Let me try to understand this from user's perspective.

Your proposal will affect five functions, which are:

   - PROCTIME()
   - NOW()
   - CURRENT_DATE
   - CURRENT_TIME
   - CURRENT_TIMESTAMP

Before the changes, as I am writing this reply, the local time here is
*2021-01-21
12:03:35 (Beijing time, UTC+8)*.
And I tried these 5 functions in sql client, and got:

*Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
CURRENT_TIME;*

*+-+-+-+--+--+*

*|  EXPR$0 |  EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |*

*+-+-+-+--+--+*

*| 2021-01-21T04:03:35.228 | 2021-01-21T04:03:35.228 |
2021-01-21T04:03:35.228 |   2021-01-21 | 04:03:35.228 |*

*+-+-+-+--+--+*
After the changes, the expected behavior will change to:

*Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
CURRENT_TIME;*

*+-+-+-+--+--+*

*|  EXPR$0 |  EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |*

*+-+-+-+--+--+*

*| 2021-01-21T12:03:35.228 | 2021-01-21T12:03:35.228 |
2021-01-21T12:03:35.228 |   2021-01-21 | 12:03:35.228 |*

*+-+-+-+--+--+*
The return type of now(), proctime() and CURRENT_TIMESTAMP still be
TIMESTAMP;

Best,
Kurt


On Tue, Jan 19, 2021 at 6:42 PM Leonard Xu  wrote:

> I found above example format may mess up in different mail client, I post
> a picture here[1].
>
> Best,
> Leonard
>
> [1]
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png
> <
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png>
>
>
> > 在 2021年1月19日,16:22,Leonard Xu  写道:
> >
> > Hi, all
> >
> > I want to start the discussion about correcting time-related function
> behavior in Flink SQL, this is a tricky topic but I think it’s time to
> address it.
> >
> > Currently some temporal function behaviors are wired to users.
> > 1.  When users use a PROCTIME() in SQL, the value of PROCTIME() has a
> timezone offset with the wall-clock time in users' local time zone, users
> need to add their local time zone offset manually to get expected local
> timestamp(e.g: Users in Germany need to +1h to get expected local
> timestamp).
> >
> > 2. Users can not use CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP  to get
> wall-clock timestamp in local time zone, and thus they need write UDF in
> their SQL just for implementing a simple filter like WHERE date_col =
> CURRENT_DATE.
> >
> > 3. Another common case  is the time window  with day interval based on
> PROCTIME(), user plan to put all data from one day into the same window,
> but the window is assigned using timestamp in UTC+0 timezone rather than
> the session timezone which leads to the window starts with an offset(e.g:
> Users in China need to add -8h in their business sql start and then +8h
> when output the result, the conversion like a magic for users).
> >
> > These problems come from that lots of time-related functions like
> PROCTIME(), NOW(), CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP are
> returning time values based on UTC+0 time zone.
> >
> > This topic will lead to a comparison of the three types, i.e.
> TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE, TIMESTAMP WITH LOCAL TIME ZONE and
> TIMESTAMP WITH TIME ZONE. In order to better understand the three types, I
> wrote a document[1] to help understand them better. You can also know the
> tree timestamp types behavior in Hadoop ecosystem from the reference link
> int the doc.
> >
> >
> > I Invested all Flink time-related functions current behavior and
> compared with other DB vendors like Pg,Presto, Hive, Spark, Snowflake,  I
> made an excel [2] to organize them well, we can use it for the next
> discussion. Please let me know if I missed something.
> > From my investigation, I think we need to correct the behavior of
> function NOW()/PROCTIME()/CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP, to
> correct them, we can change the function return type or function return
> value or change return type and return value both. All of those way are
> valid because SQL:2011 does not specify the function return type and every
> SQL engine vendor has its own implementation. For example the
> CURRENT_TIMESTAMP 

Re: Flink SQL kafka connector有办法获取到partition、offset信息嘛?

2021-01-20 文章 Evan
你好,可以获取
CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

可以查阅官网得到你想要的信息:  
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html#available-metadata
 
希望能帮助到你。



 
发件人: gimlee
发送时间: 2021-01-21 11:20
收件人: user-zh
主题: Flink SQL kafka connector有办法获取到partition、offset信息嘛?
如题,需要获取到kafka的partition、offset进行处理
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/
 


Re: flink yarn application 提交任务出错

2021-01-20 文章 Yang Wang
这个报错应该是你HDFS的core-site.xml的配置有问题

defaultFS应该是hdfs://localhost:9000/才对,你可以检查一下

Best,
Yang

casel.chen  于2021年1月19日周二 下午6:19写道:

> 今天尝试使用yarn
> application模式(带yarn.provided.lib.dirs参数),将$FLINK_LIB目录下的jar包上传到了hdfs,结果报了如下的错,是少了哪个jar包或配置文件吗?
>
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn Application Cluster
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:443)
>
> at
> com.huifu.streamsql.launcher.yarn.YarnApplicationExecutor.doStart(YarnApplicationExecutor.java:70)
>
> at
> com.huifu.streamsql.launcher.AbstractJobExecutor.start(AbstractJobExecutor.java:76)
>
> at com.huifu.streamsql.launcher.JobCommand$1.execute(JobCommand.java:12)
>
> at com.huifu.streamsql.launcher.SubmitJobMain.main(SubmitJobMain.java:39)
>
> Caused by: java.lang.IllegalArgumentException: Wrong FS:
> hdfs://localhost:9000/flinkLib, expected: file:///
>
> at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:665)
>
> at
> org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:86)
>
> at
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:630)
>
> at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:861)
>
> at
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:625)
>
> at
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442)
>
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1437)
>
> at
> org.apache.flink.yarn.YarnApplicationFileUploader.lambda$getAllFilesInProvidedLibDirs$2(YarnApplicationFileUploader.java:429)
>
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:94)
>
> at java.util.ArrayList.forEach(ArrayList.java:1259)
>
> at
> org.apache.flink.yarn.YarnApplicationFileUploader.getAllFilesInProvidedLibDirs(YarnApplicationFileUploader.java:426)
>
> at
> org.apache.flink.yarn.YarnApplicationFileUploader.(YarnApplicationFileUploader.java:109)
>
> at
> org.apache.flink.yarn.YarnApplicationFileUploader.from(YarnApplicationFileUploader.java:354)
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:710)
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:558)
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:436)
>
> ... 4 more
>
>
>
>


Re: flink yarn application提交作业问题

2021-01-20 文章 Yang Wang
目前user jar是可以支持远程,但是只能是hadoop compatiable的schema
因为远程的这个user jar并不会下载到Flink client本地,而是直接注册为Yarn的local resource来使用

所以你的这个报错是预期内的,还没有办法支持

Best,
Yang

casel.chen  于2021年1月20日周三 上午10:23写道:

> ./bin/flink run-application -t yarn-application \
>
>   -Dyarn.provided.lib.dirs="hdfs://localhost:9000/flinkLib" \
>
>   hdfs://localhost:9000/flinkJobs/TopSpeedWindowing.jar
>
>
> 这种命令执行方式是可以执行的。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-20 10:21:32,"casel.chen"  写道:
> >使用了如下命令来提交flink作业到yarn上运行,结果出错。如果job jar路径改成本地的就没有问题。我已经将
> flink-oss-fs-hadoop-1.12.0.jar 放到flink
> lib目录下面,并且在flink.conf配置文件中设置好了oss参数。试问,这种作业jar在远端的分布式文件系统flink难道不支持吗?
> >
> >
> >./bin/flink run-application -t yarn-application \
> >
> >  -Dyarn.provided.lib.dirs="oss://odps-prd/rtdp/flinkLib" \
> >
> >  oss://odps-prd/rtdp/flinkJobs/TopSpeedWindowing.jar
> >
> >
> >
> >
> >
> > The program finished with the following exception:
> >
> >
> >
> >
> >org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn Application Cluster
> >
> >at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:443)
> >
> >at
> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:64)
> >
> >at
> org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:207)
> >
> >at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:974)
> >
> >at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
> >
> >at java.security.AccessController.doPrivileged(Native Method)
> >
> >at javax.security.auth.Subject.doAs(Subject.java:422)
> >
> >at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >
> >at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >
> >at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
> >
> >Caused by: java.io.IOException: No FileSystem for scheme: oss
> >
> >at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2799)
> >
> >at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2810)
> >
> >at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
> >
> >at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
> >
> >at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
> >
> >at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
> >
> >at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
> >
> >at
> org.apache.flink.yarn.Utils.lambda$getQualifiedRemoteSharedPaths$1(Utils.java:577)
> >
> >at
> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:127)
> >
> >at org.apache.flink.yarn.Utils.getRemoteSharedPaths(Utils.java:585)
> >
> >at
> org.apache.flink.yarn.Utils.getQualifiedRemoteSharedPaths(Utils.java:573)
> >
> >at
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:708)
> >
> >at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:558)
> >
> >at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:436)
> >
> >... 9 more
>


flink heartbeat timeout

2021-01-20 文章 guoxb__...@sina.com
Hi

问题描述:
 
我在使用flink进行流式计算任务,我的程序造昨晚上21点启动的,当时看是正常的,数据也是正常处理的,在今早9点时候查看,任务被自动重启了,查看日志,报错如下:


从报错上来看是由于超时时间引起的,查看资料,是需要调整该参数参数:heartbeat.timeout,官网文档支出默认值是5,但是这样以来的话,就需要重启flink服务了,这在我们生产上是不允许的。

问题:
1、该错误的原因目前只是经过猜测,还没有确定具体的问题,希望有经验的朋友指点一二,万分感谢
2、如果我真的需要设置heartbeat.timeout这个参数的话,如何在不通过重启flink集群的方式来实现,万分感谢
说明:
我的flink版本是:1.11.0


guoxb__...@sina.com


Flink SQL kafka connector有办法获取到partition、offset信息嘛?

2021-01-20 文章 gimlee
如题,需要获取到kafka的partition、offset进行处理



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:yarn-per-job 模式 savepoint执行保存点报错

2021-01-20 文章 guanyq
./bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
./bin/flink savepoint fea3d87f138ef4c260ffe9324acc0e51 [:targetDirectory] 
application_1610788069646_0021



[:targetDirectory] 

hdfs:///flink/savepoints











在 2021-01-21 10:24:31,"刘海"  写道:
>Hi
> 我目前在进行保存点相关的测试,目前执行命令报如下错误,从错误内容上看是超时,但是没有更多的信息了,有知道大致原因希望指点一下,拜谢
>
>
>flink1.12 yarn-per-job 模式
>jobID:fea3d87f138ef4c260ffe9324acc0e51  
>yarnID : application_1610788069646_0021 
>执行的命令如下:
>./bin/flink savepoint -t yarn-per-job -D 
>yarn.application.id=application_1610788069646_0021 
>fea3d87f138ef4c260ffe9324acc0e51
>
>
>报错如下:
>
>
>org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
>fea3d87f138ef4c260ffe9324acc0e51 failed.
>at 
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:712)
>at 
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:690)
>at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919)
>at 
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:687)
>at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:989)
>at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
>at java.security.AccessController.doPrivileged(Native Method)
>at javax.security.auth.Subject.doAs(Subject.java:422)
>at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
>Caused by: java.util.concurrent.TimeoutException
>at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168)
>at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549)
>at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>at java.lang.Thread.run(Thread.java:748)
>
>
>祝好!
>| |
>刘海
>|
>|
>liuha...@163.com
>|
>签名由网易邮箱大师定制


Re: flink 写hive decimal类型报错

2021-01-20 文章 Rui Li
你好,有设置过table.exec.hive.fallback-mapred-writer参数么?可以把它设置成true再试试。

On Wed, Jan 20, 2021 at 4:39 PM kandy.wang  wrote:

> java.lang.NoSuchMethodError:
> org.apache.hadoop.hive.serde2.io.HiveDecimalWritable.serialize64(I)J
>
> at
> org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal(ColumnStatisticsImpl.java:1010)
>
> at
> org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:99)
>
> at
> org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:159)
>
> at
> org.apache.orc.impl.writer.StructTreeWriter.writeRootBatch(StructTreeWriter.java:56)
>
> at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:557)
>
> at
> org.apache.flink.orc.writer.OrcBulkWriter.addElement(OrcBulkWriter.java:58)
>
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:589)
>
> at
> org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:585)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:209)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>
> at
> org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140)
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>
> at StreamExecCalc$154.processElement(Unknown Source)
>
> 用的是flink-sql-connector-hive-2.3.6_2.11-1.12-SNAPSHOT.jar,公司的Hive也是这个版本,可能是什么原因导致?



-- 
Best regards!
Rui Li


yarn-per-job 模式 savepoint执行保存点报错

2021-01-20 文章 刘海
Hi
 我目前在进行保存点相关的测试,目前执行命令报如下错误,从错误内容上看是超时,但是没有更多的信息了,有知道大致原因希望指点一下,拜谢


flink1.12 yarn-per-job 模式
jobID:fea3d87f138ef4c260ffe9324acc0e51  
yarnID : application_1610788069646_0021 
执行的命令如下:
./bin/flink savepoint -t yarn-per-job -D 
yarn.application.id=application_1610788069646_0021 
fea3d87f138ef4c260ffe9324acc0e51


报错如下:


org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
fea3d87f138ef4c260ffe9324acc0e51 failed.
at 
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:712)
at 
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:690)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919)
at 
org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:687)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:989)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168)
at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


祝好!
| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制

flink 1.12.0版本 消费0.10版本kafka集群数据==>0.9版本kafka集群

2021-01-20 文章 guanyq
请问下如何选择kafka connector的版本
如果选择1.12.0版本,就没有FlinkKafkaProducer09/FlinkKafkaConsumer09

   org.apache.flink
   flink-connector-kafka_${scala.binary.version}


Re: flink1.12 on yarn per-job 运行问题

2021-01-20 文章 花乞丐
贴一下提交程序的参数,以及你机器的配置,从上面看,是资源分配不够!



--
Sent from: http://apache-flink.147419.n8.nabble.com/

设置状态存储位置后,job运行起来后找不到状态数据

2021-01-20 文章 刘海
Hi all
小弟遇到个问题期望大佬解答解答:
通过 env.setStateBackend(new 
RocksDBStateBackend("file:///data/flink/checkpoints"));设置状态存储位置,job运行起来后找不到状态数据,


flink1.12 yarn pre job 模式,下面是我的配置,job运行起来后在服务器上找不到 
“/data/flink/checkpoints”这个目录,像我设置了状态的存储位置是不是job一运行起来对应的存储位置就应该有状态的数据呢?


public class FlinkTestDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(6);
env.getConfig().setAutoWatermarkInterval(200);
env.setStateBackend(new RocksDBStateBackend("file:///data/flink/checkpoints"));
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, 
bsSettings);

bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
 Duration.ofMinutes(5));

Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "6000");
configuration.setString("table.exec.mini-batch.size", "5000");

| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制

Re: Pyflink JVM Metaspace 内存泄漏定位

2021-01-20 文章 YueKun
hi,不确定是否能看到图片,Jmap导出的数据分析看如下:
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Pyflink JVM Metaspace 内存泄漏定位

2021-01-20 文章 YueKun
我是Python的table
API实现的,connector是jdbc,jar包是用的flink网站上提供的链接里的(https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html),有
flink-connector-jdbc_2.11-1.12.0.jar 和 mysql-connector-java-5.1.49.jar。

任务是用SQL写的,基本和flink提供的demo的代码一致:
"
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
"

Jmap导出的数据分析看如下:
 




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Pyflink JVM Metaspace 内存泄漏定位

2021-01-20 文章 Xintong Song
JDBC连接是谁创建的,能找到相关调用栈吗,是 flink 提供的 connector 还是用户代码?

Thank you~

Xintong Song



On Wed, Jan 20, 2021 at 6:32 PM YueKun  wrote:

> 目前看泄漏是因为 mysql 的 JDBC 引起的,和
>
> http://apache-flink.147419.n8.nabble.com/1-11-1-OutOfMemoryError-Metaspace-td8367.html#a8399
> 这个问题一样。这个有什么解决方法吗?需要更换 mysql-connector-java 版本吗? 我目前用的 5.1.49 版本
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Pyflink JVM Metaspace 内存泄漏定位

2021-01-20 文章 YueKun
目前看泄漏是因为 mysql 的 JDBC 引起的,和
http://apache-flink.147419.n8.nabble.com/1-11-1-OutOfMemoryError-Metaspace-td8367.html#a8399
这个问题一样。这个有什么解决方法吗?需要更换 mysql-connector-java 版本吗? 我目前用的 5.1.49 版本



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:1.11.1 报OutOfMemoryError: Metaspace. 错误

2021-01-20 文章 YueKun
你好,请问这个问题解决了吗?我目前也有遇到这个情况



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.12 on yarn per-job 运行问题

2021-01-20 文章 chaos
跑demo都能正常跑,但是跑自己的程序就出现任务一直处于created 状态。 

  

yarn 集群资源充足。

在flink的web 界面 Exceptions 里会看到

java.util.concurrent.CompletionException:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Slot request bulk is not fulfillable! Could not allocate the required slot
within slot request timeout
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:195)
at
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:147)
at
org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:84)
at
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
at
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:87)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Slot request bulk is not fulfillable! Could not allocate the required slot
within slot request timeout
at
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:84)
... 24 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred:
30 ms
... 25 more




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: UDTAGG在SQL中可以使用么,语法是什么

2021-01-20 文章 jiangwan
你好,我这有一个使用场景,是计算前一段时间内的某字段的TopN,需要使用over窗口和udtagg的结合。
查看官网发现,udtagg不支持flinksql, tableapi的over窗口后面也只能接select语句,请问下,还有其它办法吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 写hive的并行度只能为1

2021-01-20 文章 zhuxiaoshang
hi,
并行度为1的是discard sink,实际写数据的是filewriter算子

> 2021年1月20日 下午4:29,高函  写道:
> 
> 
> 为什么目前flink写hive的并行度只能设置为1呢?



flink 写hive decimal类型报错

2021-01-20 文章 kandy.wang
java.lang.NoSuchMethodError: 
org.apache.hadoop.hive.serde2.io.HiveDecimalWritable.serialize64(I)J

at 
org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal(ColumnStatisticsImpl.java:1010)

at 
org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:99)

at 
org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:159)

at 
org.apache.orc.impl.writer.StructTreeWriter.writeRootBatch(StructTreeWriter.java:56)

at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:557)

at org.apache.flink.orc.writer.OrcBulkWriter.addElement(OrcBulkWriter.java:58)

at 
org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:589)

at 
org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:585)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:209)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)

at 
org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

at StreamExecCalc$154.processElement(Unknown Source)
用的是flink-sql-connector-hive-2.3.6_2.11-1.12-SNAPSHOT.jar,公司的Hive也是这个版本,可能是什么原因导致?

flink 写hive的并行度只能为1

2021-01-20 文章 高函

为什么目前flink写hive的并行度只能设置为1呢?

Re: K8s HA Session模式下1.12.1 jobmanager 周期性 restart

2021-01-20 文章 macdoor
拿到了吗?有什么发现吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/