回复:flink1.11 set yarn slots failed

2020-07-16 Thread flinkcx
是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4


 原始邮件 
发件人: Zhou Zach
收件人: Flink user-zh mailing list
发送时间: 2020年7月16日(周四) 14:51
主题: flink1.11 set yarn slots failed


Hi all, 使用如下命令,设置Number of slots per TaskManager /opt/flink-1.11.0/bin/flink 
run-application -t yarn-application \ -Djobmanager.memory.process.size=1024m \ 
-Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 
发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
 of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task 
Slots的值,现在每次提交作业都是0

Re: flink1.11 set yarn slots failed

2020-07-16 Thread Yang Wang
-t是新引入的参数,是不支持以前的-yxxx参数的
你需要使用-Dtaskmanager.numberOfTaskSlots=4这样来设置

Zhou Zach  于2020年7月16日周四 下午2:51写道:

> Hi all,
>
>
> 使用如下命令,设置Number of slots per TaskManager
> /opt/flink-1.11.0/bin/flink run-application -t yarn-application \
> -Djobmanager.memory.process.size=1024m \
> -Dtaskmanager.memory.process.size=2048m \
>  -ys 4 \
>
>
> 发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
> of slots per TaskManager?
> 另外,有哪些方式可以增Flink UI中的大Available Task Slots的值,现在每次提交作业都是0


Re:回复:flink1.11 set yarn slots failed

2020-07-16 Thread Zhou Zach
-D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了

















在 2020-07-16 15:03:14,"flinkcx"  写道:
>是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4
>
>
> 原始邮件 
>发件人: Zhou Zach
>收件人: Flink user-zh mailing list
>发送时间: 2020年7月16日(周四) 14:51
>主题: flink1.11 set yarn slots failed
>
>
>Hi all, 使用如下命令,设置Number of slots per TaskManager /opt/flink-1.11.0/bin/flink 
>run-application -t yarn-application \ -Djobmanager.memory.process.size=1024m \ 
>-Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 
>发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
> of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task 
>Slots的值,现在每次提交作业都是0


Flink yarn session exception

2020-07-16 Thread Rainie Li
大佬们好,我是flink新手,正在用flink 1.9.1
Flink APP cannot run, APP log error,  想求教一下会是什么原因造成的,多谢🙏

2020-06-16 17:06:21,921 WARN  org.apache.flink.client.cli.CliFrontend
- Could not load CLI class
org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError:
org/apache/hadoop/yarn/exceptions/YarnException
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1185)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1145)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070)
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.exceptions.YarnException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 5 more
2020-06-16 17:06:21,980 INFO  org.apache.flink.core.fs.FileSystem
- Hadoop is not in the classpath/dependencies. The extended
set of supported File Systems via Hadoop is not available.


FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread wangl...@geekplus.com.cn
比如:

CREATE TABLE my_table (
  id BIGINT,
 first_name STRING,
 last_name STRING,
 email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);

最终解析 debezium-json 应该是  
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium 
下面的代码
但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?

谢谢,
王磊


wangl...@geekplus.com.cn 



Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Peihui He
Hi  Congxian,

不好意思,本来想准备下例子再回下邮件的,一直拖了这么久。
情况是你说的第2种。
同@chenxyz遇到的情况类似,日志可以参考chenxyz发的
http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html


按照chenxyz 的建议换了1.10.1版本后就没有问题了。

Best wishes.


Congxian Qiu  于2020年7月15日周三 下午1:04写道:

> Hi
>
> 我尝试理解一下:
> 1 你用 1.9 跑 wordcount 作业,然后执行了一些 checkpoint,然后停止作业,然后使用 1.10 从之前 1.9 的作业生成的
> checkpoint 恢复,发现恢复不了?
> 2 你用作业 1.10 跑 wordcount,然后遇到特定的 word 会抛异常,然后 failover,发现不能从 checkpoint 恢复?
>
> 你这里的问题是第 1 种还是第 2 种呢?
>
> 另外能否分享一下你的操作步骤以及出错时候的 taskmanager log 呢?
>
> Best,
> Congxian
>
>
> Peihui He  于2020年7月14日周二 下午2:46写道:
>
> > Hi Congxian,
> >
> > 这个错误是从1.9 升级到1.10 遇到的问题。用简单的wordcount 测试,自己根据特定word
> > 抛出runtimeException,就能够重现。flink on yarn 和 flink on k8s 都出现这个问题。1.10
> > 都不能从上次的checkpoint状态中恢复。不知道是不是1.10需要其他配置呢?
> >
> > Best wishes.
> >
> > Congxian Qiu  于2020年7月14日周二 下午1:54写道:
> >
> > > Hi
> > >
> > > 这个出错是从 1.9 升级到 1.10 遇到的问题,还是说 1.10 能正常跑了,然后跑着跑着 failover 了再次恢复的时候出错了呢?
> > > 另外你可以看下 tm log 看看有没有其他异常
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > Yun Tang  于2020年7月14日周二 上午11:57写道:
> > >
> > > > Hi Peihui
> > > >
> > > >
> > >
> >
> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
> > > > cause。
> > > >
> > > > [1]
> > > >
> > >
> >
> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
> > > >
> > > >
> > > > 祝好
> > > > 唐云
> > > > 
> > > > From: Peihui He 
> > > > Sent: Tuesday, July 14, 2020 10:42
> > > > To: user-zh@flink.apache.org 
> > > > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> > > >
> > > > hello,
> > > >
> > > > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
> > > >
> > > >
> > > > Caused by: java.nio.file.NoSuchFileException:
> > > >
> > > >
> > >
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> > > > ->
> > > >
> > >
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
> > > >
> > > > 配置和1.9.2 一样:
> > > > state.backend: rocksdb
> > > > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> > > > state.savepoints.dir: hdfs:///flink/savepoints/wc/
> > > > state.backend.incremental: true
> > > >
> > > > 代码上都有
> > > >
> > > > env.enableCheckpointing(1);
> > > >
> > > >
> > >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > > > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> > > > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
> > > >
> > > >
> > > >   是1.10.0 需要做什么特别配置么?
> > > >
> > >
> >
>


flink1.11 Application 模式下启动失败问题

2020-07-16 Thread Hito Zhu
Hi all
我把作业提交模式从 yarn-cluster 换成 application 模式,启动失败,报两个错误:
1、java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.api.records.ResourceInformation
2、cannot assign instance of org.apache.commons.collections.map.LinkedMap to
field
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
of type org.apache.commons.collections.map.LinkedMap in instance of
com.tydic.tysc.core.flink.cal.v3.core.connector.kafka.source.KafkaTableSource$CustomerFlinkKafkaConsumer
在 yarn-cluster 下正常运行,请各位帮忙看下。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.11 Application 模式下启动失败问题

2020-07-16 Thread Hito Zhu
Hi all
我把作业提交模式从 yarn-cluster 换成 application 模式,启动失败,报两个错误:
1、java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.api.records.ResourceInformation
2、cannot assign instance of org.apache.commons.collections.map.LinkedMap to
field
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
of type org.apache.commons.collections.map.LinkedMap in instance of
com.tydic.tysc.core.flink.cal.v3.core.connector.kafka.source.KafkaTableSource$CustomerFlinkKafkaConsumer
在 yarn-cluster 下正常运行,请各位帮忙看下。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval

2020-07-16 Thread wangl...@geekplus.com.cn

直接在 flink-conf.yaml 文件中加配置
execution.checkpointing.interval: 6




wangl...@geekplus.com.cn 

 
Sender: Harold.Miao
Send Time: 2020-07-16 13:27
Receiver: user-zh
Subject: [sql-client] 通过sql-client提交sql怎么设置checkpointing.interval
hi flink users
 
通过sql-client提交sql怎么设置checkpointing.interval?
我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
谢谢
 
 
 
-- 
 
Best Regards,
Harold Miao


Re: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Peihui He
Hi chenxyz,

我们遇到的问题应该是一样的,换了1.10.1 后就可以从checkpoint 中恢复了。🤗

Best wishes.

chenxyz  于2020年7月15日周三 下午9:53写道:

>
>
>
> Hello,
> Peihui,可以参考下是不是和这个问题类似?之前我在1.10.0也遇到过。
>
> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html#a2239
> 解决方式:
> 1. 使用hdfs作为状态后端不会报错
> 2. 升级至1.10.1使用rocksdb也不会出现该问题
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-14 14:41:53,"Peihui He"  写道:
> >Hi Yun,
> >
> >我这边用一个word count 例子,socket -> flatmap -> keyBy -> reduce ->
> >print. 在flatmap 中当出现特定word的时候就抛出一个runtimeException。在1.9.2
> >里面是可以从checkpoint中自动恢复上次做checkpoint的时候的状态,但是用1.10.0 就不能。环境是flink on
> >yarn。
> >
> >Best wishes.
> >
> >Yun Tang  于2020年7月14日周二 上午11:57写道:
> >
> >> Hi Peihui
> >>
> >>
> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
> >> cause。
> >>
> >> [1]
> >>
> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
> >>
> >>
> >> 祝好
> >> 唐云
> >> 
> >> From: Peihui He 
> >> Sent: Tuesday, July 14, 2020 10:42
> >> To: user-zh@flink.apache.org 
> >> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >>
> >> hello,
> >>
> >> 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
> >>
> >>
> >> Caused by: java.nio.file.NoSuchFileException:
> >>
> >>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> >> ->
> >>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
> >>
> >> 配置和1.9.2 一样:
> >> state.backend: rocksdb
> >> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> >> state.savepoints.dir: hdfs:///flink/savepoints/wc/
> >> state.backend.incremental: true
> >>
> >> 代码上都有
> >>
> >> env.enableCheckpointing(1);
> >>
> >>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> >> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
> >>
> >>
> >>   是1.10.0 需要做什么特别配置么?
> >>
>


Re: Flink yarn session exception

2020-07-16 Thread Paul Lam
日志里说得比较清楚了,classpath 里没有 Hadoop 的 lib。可以参考这个文档 [1] 来配置你的环境。

1. 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
 

Best,
Paul Lam

> 2020年7月16日 15:46,Rainie Li  写道:
> 
> 大佬们好,我是flink新手,正在用flink 1.9.1
> Flink APP cannot run, APP log error,  想求教一下会是什么原因造成的,多谢🙏
> 
> 2020-06-16 17:06:21,921 WARN  org.apache.flink.client.cli.CliFrontend
>- Could not load CLI class
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.
> java.lang.NoClassDefFoundError:
> org/apache/hadoop/yarn/exceptions/YarnException
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1185)
> at
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1145)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.yarn.exceptions.YarnException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 5 more
> 2020-06-16 17:06:21,980 INFO  org.apache.flink.core.fs.FileSystem
>- Hadoop is not in the classpath/dependencies. The extended
> set of supported File Systems via Hadoop is not available.



Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Peihui He
Hi Yun,

不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
输入的特定的word抛出runtimeexception 使task
失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报

Caused by: java.nio.file.NoSuchFileException:
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
-> 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst

情况和@chenxyz 类似。
http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html

换成1.10.1 就可以了

Best wishes.

Yun Tang  于2020年7月15日周三 下午4:35写道:

> Hi Robin
>
> 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
> schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
>
> 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root cause,还请在日志中找一下无法恢复的root
> cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>
> 祝好
> 唐云
>
>
> 
> From: Robin Zhang 
> Sent: Wednesday, July 15, 2020 16:23
> To: user-zh@flink.apache.org 
> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
>
> Best
> Robin Zhang
> 
> From: Peihui He <[hidden email]>
> Sent: Tuesday, July 14, 2020 10:42
> To: [hidden email] <[hidden email]>
> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> hello,
>
> 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>
>
> Caused by: java.nio.file.NoSuchFileException:
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> ->
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>
> 配置和1.9.2 一样:
> state.backend: rocksdb
> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> state.savepoints.dir: hdfs:///flink/savepoints/wc/
> state.backend.incremental: true
>
> 代码上都有
>
> env.enableCheckpointing(1);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>
>
>   是1.10.0 需要做什么特别配置么?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re:回复:flink1.11 set yarn slots failed

2020-07-16 Thread xiao cai
可以看这里


 原始邮件 
发件人: Zhou Zach
收件人: user-zh
发送时间: 2020年7月16日(周四) 15:28
主题: Re:回复:flink1.11 set yarn slots failed


-D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了 在 2020-07-16 15:03:14,"flinkcx" 
 写道: >是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4 > > 
> 原始邮件 >发件人: Zhou Zach >收件人: Flink user-zh mailing 
list >发送时间: 2020年7月16日(周四) 14:51 >主题: flink1.11 set 
yarn slots failed > > >Hi all, 使用如下命令,设置Number of slots per TaskManager 
/opt/flink-1.11.0/bin/flink run-application -t yarn-application \ 
-Djobmanager.memory.process.size=1024m \ 
-Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 
发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
 of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task 
Slots的值,现在每次提交作业都是0

Re:回复:flink1.11 set yarn slots failed

2020-07-16 Thread xiao cai
可以看这里https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

 原始邮件 
发件人: Zhou Zach
收件人: user-zh
发送时间: 2020年7月16日(周四) 15:28
主题: Re:回复:flink1.11 set yarn slots failed


-D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了 在 2020-07-16 15:03:14,"flinkcx" 
 写道: >是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4 > > 
> 原始邮件 >发件人: Zhou Zach >收件人: Flink user-zh mailing 
list >发送时间: 2020年7月16日(周四) 14:51 >主题: flink1.11 set 
yarn slots failed > > >Hi all, 使用如下命令,设置Number of slots per TaskManager 
/opt/flink-1.11.0/bin/flink run-application -t yarn-application \ 
-Djobmanager.memory.process.size=1024m \ 
-Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 
发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
 of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task 
Slots的值,现在每次提交作业都是0

flink1.11 Application 模式下启动失败问题

2020-07-16 Thread Hito Zhu
Hi all
我把作业提交模式从 yarn-cluster 换成 application 模式,启动失败,报两个错误:
1、java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.api.records.ResourceInformation
2、cannot assign instance of org.apache.commons.collections.map.LinkedMap to
field
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
of type org.apache.commons.collections.map.LinkedMap in instance of
com.tydic.tysc.core.flink.cal.v3.core.connector.kafka.source.KafkaTableSource$CustomerFlinkKafkaConsumer
在 yarn-cluster 下正常运行,请各位帮忙看下。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.11 Application 模式下启动失败问题

2020-07-16 Thread Hito Zhu
Hi all
我把作业提交模式从 yarn-cluster 换成 application 模式,启动失败,报两个错误:
1、java.lang.ClassNotFoundException:
org.apache.hadoop.yarn.api.records.ResourceInformation
2、cannot assign instance of org.apache.commons.collections.map.LinkedMap to
field
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit
of type org.apache.commons.collections.map.LinkedMap in instance of
com.tydic.tysc.core.flink.cal.v3.core.connector.kafka.source.KafkaTableSource$CustomerFlinkKafkaConsumer
在 yarn-cluster 下正常运行,请各位帮忙看下。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re:回复:flink1.11 set yarn slots failed

2020-07-16 Thread Zhou Zach
nice, 可以不用看Command-Line Interface的文档了

















在 2020-07-16 16:16:00,"xiao cai"  写道:
>可以看这里https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html
>
> 原始邮件 
>发件人: Zhou Zach
>收件人: user-zh
>发送时间: 2020年7月16日(周四) 15:28
>主题: Re:回复:flink1.11 set yarn slots failed
>
>
>-D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了 在 2020-07-16 15:03:14,"flinkcx" 
> 写道: >是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4 > > 
>> 原始邮件 >发件人: Zhou Zach >收件人: Flink user-zh mailing 
>list >发送时间: 2020年7月16日(周四) 14:51 >主题: flink1.11 set 
>yarn slots failed > > >Hi all, 使用如下命令,设置Number of slots per TaskManager 
>/opt/flink-1.11.0/bin/flink run-application -t yarn-application \ 
>-Djobmanager.memory.process.size=1024m \ 
>-Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 
>发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
> of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task 
>Slots的值,现在每次提交作业都是0


Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread godfrey he
通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors

Best,
Godfrey

wangl...@geekplus.com.cn  于2020年7月16日周四 下午4:02写道:

> 比如:
>
> CREATE TABLE my_table (
>   id BIGINT,
>  first_name STRING,
>  last_name STRING,
>  email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
>
> 最终解析 debezium-json 应该是
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> 下面的代码
> 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Yun Tang
Hi Peihui

Flink-1.10.1 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?


[1] 
https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
祝好
唐云

From: Peihui He 
Sent: Thursday, July 16, 2020 16:15
To: user-zh@flink.apache.org 
Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

Hi Yun,

不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
输入的特定的word抛出runtimeexception 使task
失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报

Caused by: java.nio.file.NoSuchFileException:
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
-> 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst

情况和@chenxyz 类似。
http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html

换成1.10.1 就可以了

Best wishes.

Yun Tang  于2020年7月15日周三 下午4:35写道:

> Hi Robin
>
> 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
> schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
>
> 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root cause,还请在日志中找一下无法恢复的root
> cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>
> 祝好
> 唐云
>
>
> 
> From: Robin Zhang 
> Sent: Wednesday, July 15, 2020 16:23
> To: user-zh@flink.apache.org 
> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
>
> Best
> Robin Zhang
> 
> From: Peihui He <[hidden email]>
> Sent: Tuesday, July 14, 2020 10:42
> To: [hidden email] <[hidden email]>
> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> hello,
>
> 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>
>
> Caused by: java.nio.file.NoSuchFileException:
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> ->
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>
> 配置和1.9.2 一样:
> state.backend: rocksdb
> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> state.savepoints.dir: hdfs:///flink/savepoints/wc/
> state.backend.incremental: true
>
> 代码上都有
>
> env.enableCheckpointing(1);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>
>
>   是1.10.0 需要做什么特别配置么?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


来自李国鹏的邮件

2020-07-16 Thread 李国鹏
退订

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Peihui He
Hi Yun,

作业没有开启local recovery, 我这边测试1.10.0是必现的。

Best wishes.

Yun Tang  于2020年7月16日周四 下午5:04写道:

> Hi Peihui
>
> Flink-1.10.1
> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
>
>
> [1]
> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
> 祝好
> 唐云
> 
> From: Peihui He 
> Sent: Thursday, July 16, 2020 16:15
> To: user-zh@flink.apache.org 
> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> Hi Yun,
>
> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
> 输入的特定的word抛出runtimeexception 使task
> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
>
> Caused by: java.nio.file.NoSuchFileException:
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> ->
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>
> 情况和@chenxyz 类似。
>
> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html
>
> 换成1.10.1 就可以了
>
> Best wishes.
>
> Yun Tang  于2020年7月15日周三 下午4:35写道:
>
> > Hi Robin
> >
> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
> > [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
> >
> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root cause,还请在日志中找一下无法恢复的root
> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
> >
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
> >
> > 祝好
> > 唐云
> >
> >
> > 
> > From: Robin Zhang 
> > Sent: Wednesday, July 15, 2020 16:23
> > To: user-zh@flink.apache.org 
> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >
> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
> >
> > Best
> > Robin Zhang
> > 
> > From: Peihui He <[hidden email]>
> > Sent: Tuesday, July 14, 2020 10:42
> > To: [hidden email] <[hidden email]>
> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >
> > hello,
> >
> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
> >
> >
> > Caused by: java.nio.file.NoSuchFileException:
> >
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> > ->
> >
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
> >
> > 配置和1.9.2 一样:
> > state.backend: rocksdb
> > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> > state.savepoints.dir: hdfs:///flink/savepoints/wc/
> > state.backend.incremental: true
> >
> > 代码上都有
> >
> > env.enableCheckpointing(1);
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
> >
> >
> >   是1.10.0 需要做什么特别配置么?
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Peihui He
Hi Yun,

我这边测试需要在集群上跑的,本地idea跑是没有问题的。
flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。

附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。

Peihui He  于2020年7月16日周四 下午5:26写道:

> Hi Yun,
>
> 作业没有开启local recovery, 我这边测试1.10.0是必现的。
>
> Best wishes.
>
> Yun Tang  于2020年7月16日周四 下午5:04写道:
>
>> Hi Peihui
>>
>> Flink-1.10.1
>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
>>
>>
>> [1]
>> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
>> 祝好
>> 唐云
>> 
>> From: Peihui He 
>> Sent: Thursday, July 16, 2020 16:15
>> To: user-zh@flink.apache.org 
>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>
>> Hi Yun,
>>
>> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
>> 输入的特定的word抛出runtimeexception 使task
>> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
>>
>> Caused by: java.nio.file.NoSuchFileException:
>>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>> ->
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>>
>> 情况和@chenxyz 类似。
>>
>> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html
>>
>> 换成1.10.1 就可以了
>>
>> Best wishes.
>>
>> Yun Tang  于2020年7月15日周三 下午4:35写道:
>>
>> > Hi Robin
>> >
>> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
>> > [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
>> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
>> >
>> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root cause,还请在日志中找一下无法恢复的root
>> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
>> >
>> >
>> > [1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>> > [2]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>> >
>> > 祝好
>> > 唐云
>> >
>> >
>> > 
>> > From: Robin Zhang 
>> > Sent: Wednesday, July 15, 2020 16:23
>> > To: user-zh@flink.apache.org 
>> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>> >
>> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
>> >
>> > Best
>> > Robin Zhang
>> > 
>> > From: Peihui He <[hidden email]>
>> > Sent: Tuesday, July 14, 2020 10:42
>> > To: [hidden email] <[hidden email]>
>> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>> >
>> > hello,
>> >
>> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>> >
>> >
>> > Caused by: java.nio.file.NoSuchFileException:
>> >
>> >
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>> > ->
>> >
>> >
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>> >
>> > 配置和1.9.2 一样:
>> > state.backend: rocksdb
>> > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
>> > state.savepoints.dir: hdfs:///flink/savepoints/wc/
>> > state.backend.incremental: true
>> >
>> > 代码上都有
>> >
>> > env.enableCheckpointing(1);
>> >
>> >
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
>> > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>> >
>> >
>> >   是1.10.0 需要做什么特别配置么?
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-flink.147419.n8.nabble.com/
>> >
>>
>


Flink on k8s 中,Jar 任务 avatica-core 依赖和 flink-table jar 冲突问题

2020-07-16 Thread LakeShen
Hi 社区,

我现在正在迁移任务到 k8s ,目前版本为 Flink 1.6 版本,k8s 上面作业运行模式为 standalone per job.

现在遇到一个问题,业务方 Flink jar 任务使用了 org.apache.calcite.avatica 依赖,也就是下面依赖:

org.apache.calcite.avatica
avatica-core
${avatica.version}


但是这个依赖其实在 flink-table 模块中,也有这个依赖:
[image: image.png]

由于 flink on k8s  standalone per job 模式,会把 Flink 任务 jar 包放入到 flink 本身的lib
包中,我在任务启动的时候,就会报:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.calcite.avatica.ConnectionPropertiesImpl 错误。

按照我的理解,由于 Flink jar 任务包中有 avatica-core 依赖,同时在 flink lib
目录下面,flink-table_2.11-1.6-RELEASE.jar 中也有这个依赖,这两个都在 lib 目录下,然后就出现了类冲突问题。

请问怎么解决这个问题呢,非常期待你的回复。

Best,
LakeShen


Flink 1.11 Hive Streaming Write的问题

2020-07-16 Thread 李佳宸
想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。

附代码,很简短:

public class KafkaToHiveStreaming {
public static void main(String[] arg) throws Exception{
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
String name= "myhive";
String defaultDatabase = "default";
String hiveConfDir =
"/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
path
String version = "3.1.2";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
bsTableEnv.registerCatalog("myhive", hive);
bsTableEnv.useCatalog("myhive");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
bsTableEnv.executeSql("CREATE TABLE topic_products (" +
"  id BIGINT ," +
"  order_id STRING," +
"  amount DECIMAL(10, 2)," +
"  create_time TIMESTAMP " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order.test'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'properties.group.id' = 'testGroup'," +
" 'scan.startup.mode' = 'earliest-offset', " +
" 'format' = 'json'  " +
")");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
"  id BIGINT ," +
"  order_id STRING," +
"  amount DECIMAL(10, 2)" +
"  )");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
bsTableEnv.executeSql("CREATE TABLE print_table WITH
('connector' = 'print')" +
"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)");

bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " +
"id, " +
"order_id, " +
"amount " +
"FROM topic_products");

Table table1 = bsTableEnv.from("hive_sink_table_streaming");
table1.executeInsert("print_table");
}
}


state??????checkpoint??????

2020-07-16 Thread sun
env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//
env.setRestartStrategy(RestartStrategies.noRestart());
env.getCheckpointConfig().setCheckpointTimeout(500);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new 
RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
??private transient ListState

flink1.10升级到flink1.11 jar冲突

2020-07-16 Thread xyq
hello  大家好
 我在flink由1.10升级到1.11过程中遇到如下问题,请问是哪个包冲突了(本地可跑,上测试环境就报错),谢谢:


Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:329)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.io.IOException: Could not find class 
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
 in classpath.
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:721)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:187)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:180)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 15 more
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:718)



Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread wangl...@geekplus.com.cn

我在 
flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 找到了 SPI 的配置:

org.apache.flink.formats.json.JsonFileSystemFormatFactory
org.apache.flink.formats.json.JsonFormatFactory
org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
org.apache.flink.formats.json.canal.CanalJsonFormatFactory

还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep 
代码没找到类似的关系映射配置。


谢谢,
王磊



wangl...@geekplus.com.cn 

 
Sender: godfrey he
Send Time: 2020-07-16 16:38
Receiver: user-zh
Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
 
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
 
Best,
Godfrey
 
wangl...@geekplus.com.cn  于2020年7月16日周四 下午4:02写道:
 
> 比如:
>
> CREATE TABLE my_table (
>   id BIGINT,
>  first_name STRING,
>  last_name STRING,
>  email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
>
> 最终解析 debezium-json 应该是
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> 下面的代码
> 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
>
> 谢谢,
> 王磊
>
>
> wangl...@geekplus.com.cn
>
>


flink state????

2020-07-16 Thread op
?? 
??bloomfilter
 .keyBy(_._1).process(new KeyedProcessFunction[String,(String,String),String]() 
{
  var state:ValueState[BloomFilter[CharSequence]]= null
  override def open(parameters: Configuration): Unit = {
val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new 
TypeHint[BloomFilter[CharSequence]](){}))
state = getRuntimeContext.getState(stateDesc)
  }
  override def processElement(value: (String, String), ctx: 
KeyedProcessFunction[String, (String, String), String]#Context, out: 
Collector[String]) = {

var filter = state.value
if(filter==null){
  println("null filter")
  filter=  
BloomFilter.create[CharSequence](Funnels.unencodedCharsFunnel,1,0.0001)}
//val contains = filter.mightContain(value._2)
if(!filter.mightContain(value._2)) {
  filter.put(value._2)
  state.update(filter)
  out.collect(value._2)

}

  }
})
??savepoint??state??bloomfilternull??

Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread Harold.Miao
我的理解 :  大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class   类似下面的代码

private static  T findSingleInternal(
  Class factoryClass,
  Map properties,
  Optional classLoader) {

   List tableFactories = discoverFactories(classLoader);
   List filtered = filter(tableFactories, factoryClass, properties);

   if (filtered.size() > 1) {
  throw new AmbiguousTableFactoryException(
 filtered,
 factoryClass,
 tableFactories,
 properties);
   } else {
  return filtered.get(0);
   }
}

private static List
discoverFactories(Optional classLoader) {
   try {
  List result = new LinkedList<>();
  ClassLoader cl =
classLoader.orElse(Thread.currentThread().getContextClassLoader());
  ServiceLoader
 .load(TableFactory.class, cl)
 .iterator()
 .forEachRemaining(result::add);
  return result;
   } catch (ServiceConfigurationError e) {
  LOG.error("Could not load service provider for table factories.", e);
  throw new TableException("Could not load service provider for
table factories.", e);
   }

}


wangl...@geekplus.com.cn  于2020年7月16日周四 下午7:04写道:

>
> 我在
> flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> 找到了 SPI 的配置:
>
> org.apache.flink.formats.json.JsonFileSystemFormatFactory
> org.apache.flink.formats.json.JsonFormatFactory
> org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
> org.apache.flink.formats.json.canal.CanalJsonFormatFactory
>
> 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
> 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep
> 代码没找到类似的关系映射配置。
>
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: godfrey he
> Send Time: 2020-07-16 16:38
> Receiver: user-zh
> Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
>
> Best,
> Godfrey
>
> wangl...@geekplus.com.cn  于2020年7月16日周四
> 下午4:02写道:
>
> > 比如:
> >
> > CREATE TABLE my_table (
> >   id BIGINT,
> >  first_name STRING,
> >  last_name STRING,
> >  email STRING
> > ) WITH (
> >  'connector'='kafka',
> >  'topic'='user_topic',
> >  'properties.bootstrap.servers'='localhost:9092',
> >  'scan.startup.mode'='earliest-offset',
> >  'format'='debezium-json'
> > );
> >
> > 最终解析 debezium-json 应该是
> >
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> > 下面的代码
> > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
> >
> > 谢谢,
> > 王磊
> >
> >
> > wangl...@geekplus.com.cn
> >
> >
>


-- 

Best Regards,
Harold Miao


Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread Leonard Xu
Hi,

> 在 2020年7月16日,19:04,wangl...@geekplus.com.cn 写道:
> 
> 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep 
> 代码没找到类似的关系映射配置。

你DDL中不是写了 ‘format’ = ‘debzium-json’ 吗?就是这里指明的。

[sql-client] 如何绕过交互式模式去做ddl

2020-07-16 Thread Harold.Miao
hi flink users

众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,
这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!


-- 

Best Regards,
Harold Miao


Re: flink state问题

2020-07-16 Thread Congxian Qiu
Hi

你可以尝试用 state-process-api[1] 看一下 savepoint 中 state 的内容,先缩小一下问题的范围,如果
savepoint 中就没有了,那就是序列化到 savepoint 的时候出错了,savepoitn 是有的,那么就是恢复的时候出错了。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html

Best,
Congxian


op <520075...@qq.com> 于2020年7月16日周四 下午7:16写道:

> 大家好 
> 我有一个去重的需求,想节省内存用的bloomfilter,代码如下:
>  .keyBy(_._1).process(new
> KeyedProcessFunction[String,(String,String),String]() {
>   var state:ValueState[BloomFilter[CharSequence]]= null
>   override def open(parameters: Configuration): Unit = {
> val stateDesc = new
> ValueStateDescriptor("state",TypeInformation.of(new
> TypeHint[BloomFilter[CharSequence]](){}))
> state = getRuntimeContext.getState(stateDesc)
>   }
>   override def processElement(value: (String, String), ctx:
> KeyedProcessFunction[String, (String, String), String]#Context, out:
> Collector[String]) = {
>
> var filter = state.value
> if(filter==null){
>   println("null filter")
>   filter=
> BloomFilter.create[CharSequence](Funnels.unencodedCharsFunnel,1,0.0001)}
> //val contains = filter.mightContain(value._2)
> if(!filter.mightContain(value._2)) {
>   filter.put(value._2)
>   state.update(filter)
>   out.collect(value._2)
>
> }
>
>   }
> })
> 通过日志我看到每次我从savepoint恢复的时候这个state里面的bloomfilter都是null,这是为什么啊


Re: 【求助】flink打包到集群运行问题

2020-07-16 Thread Congxian Qiu
Hi

图片的文字太小了,可以看一下这个邮件[1],应该是一个问题,按理在 google 能够搜索到这个邮件列表的

[1]
http://apache-flink.147419.n8.nabble.com/Could-not-find-a-suitable-table-factory-for-TableSourceFactory-td3287.html
Best,
Congxian


徐粟  于2020年7月16日周四 下午8:02写道:

>
>
> 下面是被转发的邮件:
>
> *发件人: *徐粟 
> *主题: **【求助】flink打包到集群运行问题*
> *日期: *2020年7月16日 GMT+8 下午7:51:06
> *收件人: *user-zh@flink.apache.org
>
> hi ,please help me
> 我打包到集群之后,产生了如下图错误。
> flink版本是1.10.1 jar包是flnk-1.10.1-bin-scala_2.12.taz
> 命令在图片里面。thanks
>
>
>


Re: state无法从checkpoint中恢复

2020-07-16 Thread Congxian Qiu
Hi

1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象
2 能否把你关于 counts 的其他代码也贴一下
3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian


sun <1392427...@qq.com> 于2020年7月16日周四 下午6:16写道:

>
> 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> //作业失败后不重启
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.getCheckpointConfig().setCheckpointTimeout(500);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setStateBackend(new
> RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
>   使用状态的代码private transient ListState
>
> @Override
> public void open(Configuration parameters) throws Exception {
> StateTtlConfig ttlConfig = StateTtlConfig
> .newBuilder(Time.minutes(30))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build();
>
> ListStateDescriptor ListStateDescriptor<>("lastUserLogin", String.class);
> lastUserLogin.enableTimeToLive(ttlConfig);
> counts = getRuntimeContext().getListState(lastUserLogin);
> }
> 我重启了task managers 后。发现  counts  里面的数据都丢失了


退订

2020-07-16 Thread 李国鹏
退订

Re: Flink on k8s 中,Jar 任务 avatica-core 依赖和 flink-table jar 冲突问题

2020-07-16 Thread Congxian Qiu
Hi

你的图挂了,如果单纯想解决 jar 包冲突的问题,那么 maven shade plugin[1] 或许对你有用

[1]
https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html
Best,
Congxian


LakeShen  于2020年7月16日周四 下午6:03写道:

> Hi 社区,
>
> 我现在正在迁移任务到 k8s ,目前版本为 Flink 1.6 版本,k8s 上面作业运行模式为 standalone per job.
>
> 现在遇到一个问题,业务方 Flink jar 任务使用了 org.apache.calcite.avatica 依赖,也就是下面依赖:
> 
> org.apache.calcite.avatica
> avatica-core
> ${avatica.version}
> 
>
> 但是这个依赖其实在 flink-table 模块中,也有这个依赖:
> [image: image.png]
>
> 由于 flink on k8s  standalone per job 模式,会把 Flink 任务 jar 包放入到 flink 本身的lib
> 包中,我在任务启动的时候,就会报:
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.calcite.avatica.ConnectionPropertiesImpl 错误。
>
> 按照我的理解,由于 Flink jar 任务包中有 avatica-core 依赖,同时在 flink lib
> 目录下面,flink-table_2.11-1.6-RELEASE.jar 中也有这个依赖,这两个都在 lib 目录下,然后就出现了类冲突问题。
>
> 请问怎么解决这个问题呢,非常期待你的回复。
>
> Best,
> LakeShen
>
>
>


Re: 退订

2020-07-16 Thread Leonard Xu
Hi,
是指取消订阅邮件吗?
可以发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org  取消订阅来自 
user-zh@flink.apache.org 邮件组的邮件

邮件组的订阅管理,可以参考[1]

祝好,
Leonard Xu
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list 
 

> 在 2020年7月16日,20:18,李国鹏  写道:
> 
> 退订



回复:Flink 1.11 Hive Streaming Write的问题

2020-07-16 Thread JasonLee
hi
需要开启checkpoint


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月16日 18:03,李佳宸 写道:
想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。

附代码,很简短:

public class KafkaToHiveStreaming {
   public static void main(String[] arg) throws Exception{
   StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
   EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
   StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
   String name= "myhive";
   String defaultDatabase = "default";
   String hiveConfDir =
"/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
path
   String version = "3.1.2";

   HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
   bsTableEnv.registerCatalog("myhive", hive);
   bsTableEnv.useCatalog("myhive");
   bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
   bsTableEnv.executeSql("CREATE TABLE topic_products (" +
   "  id BIGINT ," +
   "  order_id STRING," +
   "  amount DECIMAL(10, 2)," +
   "  create_time TIMESTAMP " +
   ") WITH (" +
   " 'connector' = 'kafka'," +
   " 'topic' = 'order.test'," +
   " 'properties.bootstrap.servers' = 'localhost:9092'," +
   " 'properties.group.id' = 'testGroup'," +
   " 'scan.startup.mode' = 'earliest-offset', " +
   " 'format' = 'json'  " +
   ")");
   bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

   bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
   "  id BIGINT ," +
   "  order_id STRING," +
   "  amount DECIMAL(10, 2)" +
   "  )");
   bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
   bsTableEnv.executeSql("CREATE TABLE print_table WITH
('connector' = 'print')" +
   "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)");

   bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
   bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " +
   "id, " +
   "order_id, " +
   "amount " +
   "FROM topic_products");

   Table table1 = bsTableEnv.from("hive_sink_table_streaming");
   table1.executeInsert("print_table");
   }
}


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Congxian Qiu
Hi Peihui

感谢你的回信。能否帮忙用 1.10.0 复现一次,然后把相关的日志(JM log 和 TM Log,方便的话,也开启一下 debug
日志)分享一下呢?如果日志太大的话,可以尝试贴待 gist[1] 然后邮件列表回复一个地址即可,
非常感谢~

[1] https://gist.github.com/

Best,
Congxian


Peihui He  于2020年7月16日周四 下午5:54写道:

> Hi Yun,
>
> 我这边测试需要在集群上跑的,本地idea跑是没有问题的。
> flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。
>
> 附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。
>
> Peihui He  于2020年7月16日周四 下午5:26写道:
>
>> Hi Yun,
>>
>> 作业没有开启local recovery, 我这边测试1.10.0是必现的。
>>
>> Best wishes.
>>
>> Yun Tang  于2020年7月16日周四 下午5:04写道:
>>
>>> Hi Peihui
>>>
>>> Flink-1.10.1
>>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
>>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
>>> 祝好
>>> 唐云
>>> 
>>> From: Peihui He 
>>> Sent: Thursday, July 16, 2020 16:15
>>> To: user-zh@flink.apache.org 
>>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>>
>>> Hi Yun,
>>>
>>> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
>>> 输入的特定的word抛出runtimeexception 使task
>>> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
>>>
>>> Caused by: java.nio.file.NoSuchFileException:
>>>
>>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>>> ->
>>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>>>
>>> 情况和@chenxyz 类似。
>>>
>>> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html
>>>
>>> 换成1.10.1 就可以了
>>>
>>> Best wishes.
>>>
>>> Yun Tang  于2020年7月15日周三 下午4:35写道:
>>>
>>> > Hi Robin
>>> >
>>> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
>>> >
>>> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
>>> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
>>> >
>>> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root
>>> cause,还请在日志中找一下无法恢复的root
>>> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
>>> >
>>> >
>>> > [1]
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>>> > [2]
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>>> >
>>> > 祝好
>>> > 唐云
>>> >
>>> >
>>> > 
>>> > From: Robin Zhang 
>>> > Sent: Wednesday, July 15, 2020 16:23
>>> > To: user-zh@flink.apache.org 
>>> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>> >
>>> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
>>> >
>>> > Best
>>> > Robin Zhang
>>> > 
>>> > From: Peihui He <[hidden email]>
>>> > Sent: Tuesday, July 14, 2020 10:42
>>> > To: [hidden email] <[hidden email]>
>>> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>>> >
>>> > hello,
>>> >
>>> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>>> >
>>> >
>>> > Caused by: java.nio.file.NoSuchFileException:
>>> >
>>> >
>>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>>> > ->
>>> >
>>> >
>>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>>> >
>>> > 配置和1.9.2 一样:
>>> > state.backend: rocksdb
>>> > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
>>> > state.savepoints.dir: hdfs:///flink/savepoints/wc/
>>> > state.backend.incremental: true
>>> >
>>> > 代码上都有
>>> >
>>> > env.enableCheckpointing(1);
>>> >
>>> >
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>> > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
>>> > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>>> >
>>> >
>>> >   是1.10.0 需要做什么特别配置么?
>>> >
>>> >
>>> >
>>> > --
>>> > Sent from: http://apache-flink.147419.n8.nabble.com/
>>> >
>>>
>>


flink 1.11 checkpoint使用

2020-07-16 Thread 曹武
我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
从checkpoint恢复以后,新来op=d的数据会删除失败
重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
最大允许同时出现几个CheckPoint
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
最小得间隔时间
env.getCheckpointConfig().setPreferCheckpointForRecovery(true); //
是否倾向于用CheckPoint做故障恢复
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); //
容忍多少次CheckPoint失败
//Checkpoint文件清理策略
   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//Checkpoint外部文件路径
env.setStateBackend(new FsStateBackend(new
URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
settings);
String sourceDDL = String.format(
"CREATE TABLE debezium_source (" +
" id INT NOT NULL," +
" name STRING," +
" description STRING," +
" weight Double" +
") WITH (" +
" 'connector' = 'kafka-0.11'," +
" 'topic' = '%s'," +
" 'properties.bootstrap.servers' = '%s'," +
" 'scan.startup.mode' = 'group-offsets'," +
" 'format' = 'debezium-json'" +
")", "ddd", " 172.22.20.206:9092");
String sinkDDL = "CREATE TABLE sink (" +
" id INT NOT NULL," +
" name STRING," +
" description STRING," +
" weight Double," +
" PRIMARY KEY (id,name, description,weight) NOT ENFORCED " +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' =
'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
" 'table-name' = 'products'," +
" 'driver'= 'com.mysql.cj.jdbc.Driver'," +
" 'username'='DataPip'," +
" 'password'='DataPip'" +
")";
String dml = "INSERT INTO sink SELECT  id,name ,description, weight
FROM debezium_source GROUP BY id,name ,description, weight";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
tEnv.executeSql(dml);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11 checkpoint使用

2020-07-16 Thread godfrey he
为什么要 GROUP BY id,name ,description, weight ?
直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
debezium_source" 不能满足需求?

曹武 <14701319...@163.com> 于2020年7月16日周四 下午9:30写道:

> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
> 从checkpoint恢复以后,新来op=d的数据会删除失败
> 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
>
> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
> 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
> 最大允许同时出现几个CheckPoint
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
> 最小得间隔时间
> env.getCheckpointConfig().setPreferCheckpointForRecovery(true); //
> 是否倾向于用CheckPoint做故障恢复
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
> //
> 容忍多少次CheckPoint失败
> //Checkpoint文件清理策略
>
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> //Checkpoint外部文件路径
> env.setStateBackend(new FsStateBackend(new
> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> settings);
> String sourceDDL = String.format(
> "CREATE TABLE debezium_source (" +
> " id INT NOT NULL," +
> " name STRING," +
> " description STRING," +
> " weight Double" +
> ") WITH (" +
> " 'connector' = 'kafka-0.11'," +
> " 'topic' = '%s'," +
> " 'properties.bootstrap.servers' = '%s'," +
> " 'scan.startup.mode' = 'group-offsets'," +
> " 'format' = 'debezium-json'" +
> ")", "ddd", " 172.22.20.206:9092");
> String sinkDDL = "CREATE TABLE sink (" +
> " id INT NOT NULL," +
> " name STRING," +
> " description STRING," +
> " weight Double," +
> " PRIMARY KEY (id,name, description,weight) NOT ENFORCED "
> +
> ") WITH (" +
> " 'connector' = 'jdbc'," +
> " 'url' =
> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
> " 'table-name' = 'products'," +
> " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
> " 'username'='DataPip'," +
> " 'password'='DataPip'" +
> ")";
> String dml = "INSERT INTO sink SELECT  id,name ,description, weight
> FROM debezium_source GROUP BY id,name ,description, weight";
> tEnv.executeSql(sourceDDL);
> tEnv.executeSql(sinkDDL);
> tEnv.executeSql(dml);
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?

2020-07-16 Thread wangl...@geekplus.com.cn

 谢谢,我理解了。



wangl...@geekplus.com.cn 

Sender: Harold.Miao
Send Time: 2020-07-16 19:33
Receiver: user-zh
Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
我的理解 :  大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class   类似下面的代码
 
private static  T findSingleInternal(
  Class factoryClass,
  Map properties,
  Optional classLoader) {
 
   List tableFactories = discoverFactories(classLoader);
   List filtered = filter(tableFactories, factoryClass, properties);
 
   if (filtered.size() > 1) {
  throw new AmbiguousTableFactoryException(
 filtered,
 factoryClass,
 tableFactories,
 properties);
   } else {
  return filtered.get(0);
   }
}
 
private static List
discoverFactories(Optional classLoader) {
   try {
  List result = new LinkedList<>();
  ClassLoader cl =
classLoader.orElse(Thread.currentThread().getContextClassLoader());
  ServiceLoader
 .load(TableFactory.class, cl)
 .iterator()
 .forEachRemaining(result::add);
  return result;
   } catch (ServiceConfigurationError e) {
  LOG.error("Could not load service provider for table factories.", e);
  throw new TableException("Could not load service provider for
table factories.", e);
   }
 
}
 
 
wangl...@geekplus.com.cn  于2020年7月16日周四 下午7:04写道:
 
>
> 我在
> flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> 找到了 SPI 的配置:
>
> org.apache.flink.formats.json.JsonFileSystemFormatFactory
> org.apache.flink.formats.json.JsonFormatFactory
> org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
> org.apache.flink.formats.json.canal.CanalJsonFormatFactory
>
> 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
> 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep
> 代码没找到类似的关系映射配置。
>
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
> Sender: godfrey he
> Send Time: 2020-07-16 16:38
> Receiver: user-zh
> Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
>
> Best,
> Godfrey
>
> wangl...@geekplus.com.cn  于2020年7月16日周四
> 下午4:02写道:
>
> > 比如:
> >
> > CREATE TABLE my_table (
> >   id BIGINT,
> >  first_name STRING,
> >  last_name STRING,
> >  email STRING
> > ) WITH (
> >  'connector'='kafka',
> >  'topic'='user_topic',
> >  'properties.bootstrap.servers'='localhost:9092',
> >  'scan.startup.mode'='earliest-offset',
> >  'format'='debezium-json'
> > );
> >
> > 最终解析 debezium-json 应该是
> >
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> > 下面的代码
> > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
> >
> > 谢谢,
> > 王磊
> >
> >
> > wangl...@geekplus.com.cn
> >
> >
>
 
 
-- 
 
Best Regards,
Harold Miao


Re: Flink 1.11 Hive Streaming Write的问题

2020-07-16 Thread 李佳宸
好的,谢谢~~~

JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道:

> hi
> 需要开启checkpoint
>
>
> | |
> JasonLee
> |
> |
> 邮箱:17610775...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年07月16日 18:03,李佳宸 写道:
> 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> 批量的hive写入,流环境的读取是正常的。
>
> 附代码,很简短:
>
> public class KafkaToHiveStreaming {
>public static void main(String[] arg) throws Exception{
>StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings bsSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
>String name= "myhive";
>String defaultDatabase = "default";
>String hiveConfDir =
> "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> path
>String version = "3.1.2";
>
>HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> hiveConfDir, version);
>bsTableEnv.registerCatalog("myhive", hive);
>bsTableEnv.useCatalog("myhive");
>bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>bsTableEnv.executeSql("CREATE TABLE topic_products (" +
>"  id BIGINT ," +
>"  order_id STRING," +
>"  amount DECIMAL(10, 2)," +
>"  create_time TIMESTAMP " +
>") WITH (" +
>" 'connector' = 'kafka'," +
>" 'topic' = 'order.test'," +
>" 'properties.bootstrap.servers' = 'localhost:9092'," +
>" 'properties.group.id' = 'testGroup'," +
>" 'scan.startup.mode' = 'earliest-offset', " +
>" 'format' = 'json'  " +
>")");
>bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
>bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
>"  id BIGINT ," +
>"  order_id STRING," +
>"  amount DECIMAL(10, 2)" +
>"  )");
>bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>bsTableEnv.executeSql("CREATE TABLE print_table WITH
> ('connector' = 'print')" +
>"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> ALL)");
>
>bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT
> " +
>"id, " +
>"order_id, " +
>"amount " +
>"FROM topic_products");
>
>Table table1 = bsTableEnv.from("hive_sink_table_streaming");
>table1.executeInsert("print_table");
>}
> }
>


????

2020-07-16 Thread ????


flink 1.11任务提交的问题

2020-07-16 Thread sunfulin
hi,
请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql 
dml提交(executeSQL执行),又通过DataStream.addSink来写出,
通过StreamExecutionEnvironment.execute提交,yarn 
per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。

Re: 退订

2020-07-16 Thread Leonard Xu
Hi,
是指取消订阅邮件吗?
可以发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org 
  取消订阅来自 user-zh@flink.apache.org 
 邮件组的邮件

邮件组的订阅管理,可以参考[1]

祝好,
Leonard Xu
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list 


> 在 2020年7月16日,22:46,沈阳 <122969...@qq.com> 写道:
> 
> 退订



Re: flink 1.11任务提交的问题

2020-07-16 Thread Leonard Xu
Hi,

我理解目前好像做不到, cc: godfrey 大佬看看

祝好,
Leonard Xu

> 在 2020年7月16日,23:08,sunfulin  写道:
> 
> hi,
> 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql 
> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
> 通过StreamExecutionEnvironment.execute提交,yarn 
> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。



Re: Flink yarn session exception

2020-07-16 Thread Rainie Li
多谢,我set了这些envs:

export JAVA_HOME=/usr/lib/jvm/java-8-oracle
export PATH=$JAVA_HOME/bin:$PATH
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
export FLINK_CONF_DIR=/etc/flink-1.9.1/conf
export FLINK_LOG_DIR=/home/rainieli/

有什么问题吗?🙏


On Thu, Jul 16, 2020 at 1:12 AM Paul Lam  wrote:

> 日志里说得比较清楚了,classpath 里没有 Hadoop 的 lib。可以参考这个文档 [1] 来配置你的环境。
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
> >
> Best,
> Paul Lam
>
> > 2020年7月16日 15:46,Rainie Li  写道:
> >
> > 大佬们好,我是flink新手,正在用flink 1.9.1
> > Flink APP cannot run, APP log error,  想求教一下会是什么原因造成的,多谢🙏
> >
> > 2020-06-16 17:06:21,921 WARN  org.apache.flink.client.cli.CliFrontend
> >- Could not load CLI class
> > org.apache.flink.yarn.cli.FlinkYarnSessionCli.
> > java.lang.NoClassDefFoundError:
> > org/apache/hadoop/yarn/exceptions/YarnException
> > at java.lang.Class.forName0(Native Method)
> > at java.lang.Class.forName(Class.java:264)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1185)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1145)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070)
> > Caused by: java.lang.ClassNotFoundException:
> > org.apache.hadoop.yarn.exceptions.YarnException
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > ... 5 more
> > 2020-06-16 17:06:21,980 INFO  org.apache.flink.core.fs.FileSystem
> >- Hadoop is not in the classpath/dependencies. The
> extended
> > set of supported File Systems via Hadoop is not available.
>
>


Re: Flink yarn session exception

2020-07-16 Thread 忝忝向仧
你可以看看lib里面的包跟官网的要求是不是一样的



发自我的iPhone


-- Original --
From: Rainie Li https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
> <
> 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
> >
> Best,
> Paul Lam
>
> > 2020年7月16日 15:46,Rainie Li 

Re: Flink on k8s 中,Jar 任务 avatica-core 依赖和 flink-table jar 冲突问题

2020-07-16 Thread LakeShen
嗯嗯,Congxian,感谢你的回复,我通过 Maven Shaded 解决问题😁。

Congxian Qiu  于2020年7月16日周四 下午8:19写道:

> Hi
>
> 你的图挂了,如果单纯想解决 jar 包冲突的问题,那么 maven shade plugin[1] 或许对你有用
>
> [1]
>
> https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html
> Best,
> Congxian
>
>
> LakeShen  于2020年7月16日周四 下午6:03写道:
>
> > Hi 社区,
> >
> > 我现在正在迁移任务到 k8s ,目前版本为 Flink 1.6 版本,k8s 上面作业运行模式为 standalone per job.
> >
> > 现在遇到一个问题,业务方 Flink jar 任务使用了 org.apache.calcite.avatica 依赖,也就是下面依赖:
> > 
> > org.apache.calcite.avatica
> > avatica-core
> > ${avatica.version}
> > 
> >
> > 但是这个依赖其实在 flink-table 模块中,也有这个依赖:
> > [image: image.png]
> >
> > 由于 flink on k8s  standalone per job 模式,会把 Flink 任务 jar 包放入到 flink 本身的lib
> > 包中,我在任务启动的时候,就会报:
> > Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> > org.apache.calcite.avatica.ConnectionPropertiesImpl 错误。
> >
> > 按照我的理解,由于 Flink jar 任务包中有 avatica-core 依赖,同时在 flink lib
> > 目录下面,flink-table_2.11-1.6-RELEASE.jar 中也有这个依赖,这两个都在 lib 目录下,然后就出现了类冲突问题。
> >
> > 请问怎么解决这个问题呢,非常期待你的回复。
> >
> > Best,
> > LakeShen
> >
> >
> >
>


flink1.9写权限认证的es6

2020-07-16 Thread Dream-底限
hi:
请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Peihui He
Hi Congxian

见附件。

Best wishes.

Congxian Qiu  于2020年7月16日周四 下午8:24写道:

> Hi Peihui
>
> 感谢你的回信。能否帮忙用 1.10.0 复现一次,然后把相关的日志(JM log 和 TM Log,方便的话,也开启一下 debug
> 日志)分享一下呢?如果日志太大的话,可以尝试贴待 gist[1] 然后邮件列表回复一个地址即可,
> 非常感谢~
>
> [1] https://gist.github.com/
>
> Best,
> Congxian
>
>
> Peihui He  于2020年7月16日周四 下午5:54写道:
>
> > Hi Yun,
> >
> > 我这边测试需要在集群上跑的,本地idea跑是没有问题的。
> > flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。
> >
> > 附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。
> >
> > Peihui He  于2020年7月16日周四 下午5:26写道:
> >
> >> Hi Yun,
> >>
> >> 作业没有开启local recovery, 我这边测试1.10.0是必现的。
> >>
> >> Best wishes.
> >>
> >> Yun Tang  于2020年7月16日周四 下午5:04写道:
> >>
> >>> Hi Peihui
> >>>
> >>> Flink-1.10.1
> >>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
> >>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
> >>>
> >>>
> >>> [1]
> >>>
> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
> >>> 祝好
> >>> 唐云
> >>> 
> >>> From: Peihui He 
> >>> Sent: Thursday, July 16, 2020 16:15
> >>> To: user-zh@flink.apache.org 
> >>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >>>
> >>> Hi Yun,
> >>>
> >>> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
> >>> 输入的特定的word抛出runtimeexception 使task
> >>> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
> >>>
> >>> Caused by: java.nio.file.NoSuchFileException:
> >>>
> >>>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> >>> ->
> >>>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
> >>>
> >>> 情况和@chenxyz 类似。
> >>>
> >>>
> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html
> >>>
> >>> 换成1.10.1 就可以了
> >>>
> >>> Best wishes.
> >>>
> >>> Yun Tang  于2020年7月15日周三 下午4:35写道:
> >>>
> >>> > Hi Robin
> >>> >
> >>> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
> >>> >
> >>>
> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
> >>> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
> >>> >
> >>> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root
> >>> cause,还请在日志中找一下无法恢复的root
> >>> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
> >>> >
> >>> >
> >>> > [1]
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
> >>> > [2]
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
> >>> >
> >>> > 祝好
> >>> > 唐云
> >>> >
> >>> >
> >>> > 
> >>> > From: Robin Zhang 
> >>> > Sent: Wednesday, July 15, 2020 16:23
> >>> > To: user-zh@flink.apache.org 
> >>> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >>> >
> >>> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
> >>> >
> >>> > Best
> >>> > Robin Zhang
> >>> > 
> >>> > From: Peihui He <[hidden email]>
> >>> > Sent: Tuesday, July 14, 2020 10:42
> >>> > To: [hidden email] <[hidden email]>
> >>> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >>> >
> >>> > hello,
> >>> >
> >>> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
> >>> >
> >>> >
> >>> > Caused by: java.nio.file.NoSuchFileException:
> >>> >
> >>> >
> >>>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> >>> > ->
> >>> >
> >>> >
> >>>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
> >>> >
> >>> > 配置和1.9.2 一样:
> >>> > state.backend: rocksdb
> >>> > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> >>> > state.savepoints.dir: hdfs:///flink/savepoints/wc/
> >>> > state.backend.incremental: true
> >>> >
> >>> > 代码上都有
> >>> >
> >>> > env.enableCheckpointing(1);
> >>> >
> >>> >
> >>>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >>> > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> >>> > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
> >>> >
> >>> >
> >>> >   是1.10.0 需要做什么特别配置么?
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > Se

Re: flink 1.11 checkpoint使用

2020-07-16 Thread Jark Wu
Hi,

能确认一下 kafka 中有完整的全量数据吗? 也就是 这个 DELETE 消息之前,有对应的 INSERT 消息吗?
如果没有的话,是可能会发生这个现象的(DELETE 在 group by 节点会被认为脏数据而丢掉)。
当然也可以像 godfrey 建议的那样,不 groupby,直接全部字段 INSERT INTO sink,DELETE 就不会被丢弃掉。

Best,
Jark

On Thu, 16 Jul 2020 at 21:56, godfrey he  wrote:

> 为什么要 GROUP BY id,name ,description, weight ?
> 直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
> debezium_source" 不能满足需求?
>
> 曹武 <14701319...@163.com> 于2020年7月16日周四 下午9:30写道:
>
> > 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
> > 从checkpoint恢复以后,新来op=d的数据会删除失败
> > 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
> >
> >
> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
> > 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
> > .useBlinkPlanner()
> > .inStreamingMode()
> > .build();
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> > env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
> > env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
> > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
> > 最大允许同时出现几个CheckPoint
> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
> > 最小得间隔时间
> > env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
> //
> > 是否倾向于用CheckPoint做故障恢复
> > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
> > //
> > 容忍多少次CheckPoint失败
> > //Checkpoint文件清理策略
> >
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > //Checkpoint外部文件路径
> > env.setStateBackend(new FsStateBackend(new
> > URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
> > TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> > settings);
> > String sourceDDL = String.format(
> > "CREATE TABLE debezium_source (" +
> > " id INT NOT NULL," +
> > " name STRING," +
> > " description STRING," +
> > " weight Double" +
> > ") WITH (" +
> > " 'connector' = 'kafka-0.11'," +
> > " 'topic' = '%s'," +
> > " 'properties.bootstrap.servers' = '%s'," +
> > " 'scan.startup.mode' = 'group-offsets'," +
> > " 'format' = 'debezium-json'" +
> > ")", "ddd", " 172.22.20.206:9092");
> > String sinkDDL = "CREATE TABLE sink (" +
> > " id INT NOT NULL," +
> > " name STRING," +
> > " description STRING," +
> > " weight Double," +
> > " PRIMARY KEY (id,name, description,weight) NOT ENFORCED
> "
> > +
> > ") WITH (" +
> > " 'connector' = 'jdbc'," +
> > " 'url' =
> > 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
> > " 'table-name' = 'products'," +
> > " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
> > " 'username'='DataPip'," +
> > " 'password'='DataPip'" +
> > ")";
> > String dml = "INSERT INTO sink SELECT  id,name ,description,
> weight
> > FROM debezium_source GROUP BY id,name ,description, weight";
> > tEnv.executeSql(sourceDDL);
> > tEnv.executeSql(sinkDDL);
> > tEnv.executeSql(dml);
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


flink connector formats问题

2020-07-16 Thread 酷酷的浑蛋
请问flink可以自定义format吗,目前提供的format必须要进行一次数据过滤为规则数据才行,可不可以自定义format实现自己的数据格式source呢?
目前flink支持的:
| 格式 | 支持的连接器 |
| CSV | Apache Kafka, Filesystem |
| JSON | Apache Kafka, Filesystem, Elasticsearch |
| Apache Avro | Apache Kafka, Filesystem |
| Debezium CDC | Apache Kafka |
| Canal CDC | Apache Kafka |
| Apache Parquet | Filesystem |
| Apache ORC | Filesystem |

回复:flink connector formats问题

2020-07-16 Thread 夏帅
你好,这个是可以进行自定义的
参考https://jxeditor.github.io/2020/06/11/FlinkSQL%E8%87%AA%E5%AE%9A%E4%B9%89FORMAT_TYPE/


--
发件人:酷酷的浑蛋 
发送时间:2020年7月17日(星期五) 10:42
收件人:user-zh 
主 题:flink connector formats问题

请问flink可以自定义format吗,目前提供的format必须要进行一次数据过滤为规则数据才行,可不可以自定义format实现自己的数据格式source呢?
目前flink支持的:
| 格式 | 支持的连接器 |
| CSV | Apache Kafka, Filesystem |
| JSON | Apache Kafka, Filesystem, Elasticsearch |
| Apache Avro | Apache Kafka, Filesystem |
| Debezium CDC | Apache Kafka |
| Canal CDC | Apache Kafka |
| Apache Parquet | Filesystem |
| Apache ORC | Filesystem |

回复:flink1.9写权限认证的es6

2020-07-16 Thread 夏帅
你好,请问是FlinkSQL么
FLinkSQL可以参考下这份邮件
http://apache-flink.147419.n8.nabble.com/ddl-es-td2094.html
DataStream可以尝试自定义ElasticsearchSink实现权限认证


--
发件人:Dream-底限 
发送时间:2020年7月17日(星期五) 10:12
收件人:user-zh 
主 题:flink1.9写权限认证的es6

hi:
请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下


Re: Flink yarn session exception

2020-07-16 Thread Rainie Li
好搭,谢谢!

On Thu, Jul 16, 2020 at 5:32 PM 忝忝向仧 <153488...@qq.com> wrote:

> 你可以看看lib里面的包跟官网的要求是不是一样的
>
>
>
> 发自我的iPhone
>
>
> -- Original --
> From: Rainie Li  Date: Fri,Jul 17,2020 1:06 AM
> To: user-zh  Subject: Re: Flink yarn session exception
>
>
>
> 多谢,我set了这些envs:
>
> export JAVA_HOME=/usr/lib/jvm/java-8-oracle
> export PATH=$JAVA_HOME/bin:$PATH
> export HADOOP_HOME=/usr/local/hadoop
> export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
> export HADOOP_CLASSPATH=`hadoop classpath`
> export FLINK_CONF_DIR=/etc/flink-1.9.1/conf
> export FLINK_LOG_DIR=/home/rainieli/
>
> 有什么问题吗?🙏
>
>
> On Thu, Jul 16, 2020 at 1:12 AM Paul Lam 
> > 日志里说得比较清楚了,classpath 里没有 Hadoop 的 lib。可以参考这个文档 [1] 来配置你的环境。
> >
> > 1.
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html
> > >
> > Best,
> > Paul Lam
> >
> > > 2020年7月16日 15:46,Rainie Li  写道:
> > >
> > > 大佬们好,我是flink新手,正在用flink 1.9.1
> > > Flink APP cannot run, APP log error,  想求教一下会是什么原因造成的,多谢🙏
> > >
> > > 2020-06-16 17:06:21,921 WARN 
> org.apache.flink.client.cli.CliFrontend
> >
> >   
> - Could not load CLI class
> > > org.apache.flink.yarn.cli.FlinkYarnSessionCli.
> > > java.lang.NoClassDefFoundError:
> > > org/apache/hadoop/yarn/exceptions/YarnException
> > > at java.lang.Class.forName0(Native Method)
> > > at java.lang.Class.forName(Class.java:264)
> > > at
> > >
> >
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1185)
> > > at
> > >
> >
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1145)
> > > at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1070)
> > > Caused by: java.lang.ClassNotFoundException:
> > > org.apache.hadoop.yarn.exceptions.YarnException
> > > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > > ... 5 more
> > > 2020-06-16 17:06:21,980 INFO 
> org.apache.flink.core.fs.FileSystem
> >
> >   
> - Hadoop is not in the classpath/dependencies. The
> > extended
> > > set of supported File Systems via Hadoop is not available.
> >
> >


回复:flink connector formats问题

2020-07-16 Thread 酷酷的浑蛋


我看您写了'format.type' = ‘custom',这个custom 是跟哪里关联的呢? 还是说这里要写类路径?


在2020年07月17日 10:47,夏帅 写道:
你好,这个是可以进行自定义的
参考https://jxeditor.github.io/2020/06/11/FlinkSQL%E8%87%AA%E5%AE%9A%E4%B9%89FORMAT_TYPE/


--
发件人:酷酷的浑蛋 
发送时间:2020年7月17日(星期五) 10:42
收件人:user-zh 
主 题:flink connector formats问题

请问flink可以自定义format吗,目前提供的format必须要进行一次数据过滤为规则数据才行,可不可以自定义format实现自己的数据格式source呢?
目前flink支持的:
| 格式 | 支持的连接器 |
| CSV | Apache Kafka, Filesystem |
| JSON | Apache Kafka, Filesystem, Elasticsearch |
| Apache Avro | Apache Kafka, Filesystem |
| Debezium CDC | Apache Kafka |
| Canal CDC | Apache Kafka |
| Apache Parquet | Filesystem |
| Apache ORC | Filesystem |

回复:flink connector formats问题

2020-07-16 Thread 酷酷的浑蛋
找到了,谢谢


| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制


在2020年07月17日 10:57,酷酷的浑蛋 写道:


我看您写了'format.type' = ‘custom',这个custom 是跟哪里关联的呢? 还是说这里要写类路径?


在2020年07月17日 10:47,夏帅 写道:
你好,这个是可以进行自定义的
参考https://jxeditor.github.io/2020/06/11/FlinkSQL%E8%87%AA%E5%AE%9A%E4%B9%89FORMAT_TYPE/


--
发件人:酷酷的浑蛋 
发送时间:2020年7月17日(星期五) 10:42
收件人:user-zh 
主 题:flink connector formats问题

请问flink可以自定义format吗,目前提供的format必须要进行一次数据过滤为规则数据才行,可不可以自定义format实现自己的数据格式source呢?
目前flink支持的:
| 格式 | 支持的连接器 |
| CSV | Apache Kafka, Filesystem |
| JSON | Apache Kafka, Filesystem, Elasticsearch |
| Apache Avro | Apache Kafka, Filesystem |
| Debezium CDC | Apache Kafka |
| Canal CDC | Apache Kafka |
| Apache Parquet | Filesystem |
| Apache ORC | Filesystem |

Re: flink 1.11任务提交的问题

2020-07-16 Thread godfrey he
hi sunfulin,
目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。

Best,
Godfrey

Leonard Xu  于2020年7月17日周五 上午12:12写道:

> Hi,
>
> 我理解目前好像做不到, cc: godfrey 大佬看看
>
> 祝好,
> Leonard Xu
>
> > 在 2020年7月16日,23:08,sunfulin  写道:
> >
> > hi,
> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
> > 通过StreamExecutionEnvironment.execute提交,yarn
> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
>
>


flink-1.11 KafkaDynamicTableSouce groupBy 结果怎样发送到 kafka

2020-07-16 Thread wangl...@geekplus.com.cn

 INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*) from 
kafka_ods_artemis_out_order group by warehouse_id;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Table sink 
'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming update 
changes which is produced by node GroupAggregate(groupBy=[warehouse_id], 
select=[warehouse_id, COUNT(*) AS EXPR$1])

在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。
 
我看现在 Flink-1.11 中是用了  KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让 GroupBy 
的结果也发送到 Kafka 呢?

谢谢,
王磊 


wangl...@geekplus.com.cn 



Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 Thread Congxian Qiu
Hi  Peihui

感谢你的回复,我这边没有看到附件,你那边能否确认下呢?

Best,
Congxian


Peihui He  于2020年7月17日周五 上午10:13写道:

> Hi Congxian
>
> 见附件。
>
> Best wishes.
>
> Congxian Qiu  于2020年7月16日周四 下午8:24写道:
>
>> Hi Peihui
>>
>> 感谢你的回信。能否帮忙用 1.10.0 复现一次,然后把相关的日志(JM log 和 TM Log,方便的话,也开启一下 debug
>> 日志)分享一下呢?如果日志太大的话,可以尝试贴待 gist[1] 然后邮件列表回复一个地址即可,
>> 非常感谢~
>>
>> [1] https://gist.github.com/
>>
>> Best,
>> Congxian
>>
>>
>> Peihui He  于2020年7月16日周四 下午5:54写道:
>>
>> > Hi Yun,
>> >
>> > 我这边测试需要在集群上跑的,本地idea跑是没有问题的。
>> > flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。
>> >
>> > 附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。
>> >
>> > Peihui He  于2020年7月16日周四 下午5:26写道:
>> >
>> >> Hi Yun,
>> >>
>> >> 作业没有开启local recovery, 我这边测试1.10.0是必现的。
>> >>
>> >> Best wishes.
>> >>
>> >> Yun Tang  于2020年7月16日周四 下午5:04写道:
>> >>
>> >>> Hi Peihui
>> >>>
>> >>> Flink-1.10.1
>> >>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
>> >>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
>> >>>
>> >>>
>> >>> [1]
>> >>>
>> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
>> >>> 祝好
>> >>> 唐云
>> >>> 
>> >>> From: Peihui He 
>> >>> Sent: Thursday, July 16, 2020 16:15
>> >>> To: user-zh@flink.apache.org 
>> >>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>> >>>
>> >>> Hi Yun,
>> >>>
>> >>> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
>> >>> 输入的特定的word抛出runtimeexception 使task
>> >>> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
>> >>>
>> >>> Caused by: java.nio.file.NoSuchFileException:
>> >>>
>> >>>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>> >>> ->
>> >>>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>> >>>
>> >>> 情况和@chenxyz 类似。
>> >>>
>> >>>
>> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html
>> >>>
>> >>> 换成1.10.1 就可以了
>> >>>
>> >>> Best wishes.
>> >>>
>> >>> Yun Tang  于2020年7月15日周三 下午4:35写道:
>> >>>
>> >>> > Hi Robin
>> >>> >
>> >>> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性
>> >>> >
>> >>>
>> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state
>> >>> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的.
>> >>> >
>> >>> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root
>> >>> cause,还请在日志中找一下无法恢复的root
>> >>> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。
>> >>> >
>> >>> >
>> >>> > [1]
>> >>> >
>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>> >>> > [2]
>> >>> >
>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>> >>> >
>> >>> > 祝好
>> >>> > 唐云
>> >>> >
>> >>> >
>> >>> > 
>> >>> > From: Robin Zhang 
>> >>> > Sent: Wednesday, July 15, 2020 16:23
>> >>> > To: user-zh@flink.apache.org 
>> >>> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>> >>> >
>> >>> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
>> >>> >
>> >>> > Best
>> >>> > Robin Zhang
>> >>> > 
>> >>> > From: Peihui He <[hidden email]>
>> >>> > Sent: Tuesday, July 14, 2020 10:42
>> >>> > To: [hidden email] <[hidden email]>
>> >>> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>> >>> >
>> >>> > hello,
>> >>> >
>> >>> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>> >>> >
>> >>> >
>> >>> > Caused by: java.nio.file.NoSuchFileException:
>> >>> >
>> >>> >
>> >>>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
>> >>> > ->
>> >>> >
>> >>> >
>> >>>
>> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>> >>> >
>> >>> > 配置和1.9.2 一样:
>> >>> > state.backend: rocksdb
>> >>> > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
>> >>> > state.savepoints.dir: hdfs:///flink/savepoints/wc/
>> >>> > state.backend.incremental: true
>> >>> >
>> >>> > 代码上都有
>> >>> >
>> >>> > env.enableCheckpointing(1);
>> >>> >
>> >>> >
>> >>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCEL

flink-1.11 DDL ????hdfs???? Cannot instantiate user function

2020-07-16 Thread kcz
standalone 
lib  jar??
flink-connector-hive_2.11-1.11.0.jar        
flink-json-1.11.0.jar                
        flink-sql-connector-kafka_2.12-1.11.0.jar  
log4j-api-2.12.1.jar
flink-csv-1.11.0.jar                
        flink-parquet_2.11-1.11.0.jar      
          flink-table_2.11-1.11.0.jar    
            log4j-core-2.12.1.jar
flink-dist_2.11-1.11.0.jar              
    flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar  
flink-table-blink_2.11-1.11.0.jar          
log4j-slf4j-impl-2.12.1.jar
flink-hadoop-compatibility_2.11-1.11.0.jar  
flink-shaded-zookeeper-3.4.14.jar            
log4j-1.2-api-2.12.1.jar





??idea
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
// 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);;
env.setStateBackend(new FsStateBackend(path));

tableEnv.executeSql("CREATE TABLE source_table (\n" +
"\thost STRING,\n" +
"\turl STRING,\n" +
"\tpublic_date STRING\n" +
") WITH (\n" +
"\t'connector.type' = 'kafka',\n" +
"\t'connector.version' = 'universal',\n" +
"\t'connector.startup-mode' = 'latest-offset',\n" +
"\t'connector.topic' = 'test_flink_1.11',\n" +
"\t'connector.properties.group.id' = 'domain_testGroup',\n" +
"\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
"\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
"\t'update-mode' = 'append',\n" +
"\t'format.type' = 'json',\n" +
"\t'format.derive-schema' = 'true'\n" +
")");

tableEnv.executeSql("CREATE TABLE fs_table (\n" +
"  host STRING,\n" +
"  url STRING,\n" +
"  public_date STRING\n" +
") PARTITIONED BY (public_date) WITH (\n" +
"  'connector'='filesystem',\n" +
"  'path'='path',\n" +
"  'format'='json',\n" +
"  'sink.partition-commit.delay'='0s',\n" +
"  'sink.partition-commit.policy.kind'='success-file'\n" +
")");

tableEnv.executeSql("INSERT INTO  fs_table SELECT host, url, 
DATE_FORMAT(public_date, '-MM-dd') FROM source_table");
TableResult result = tableEnv.executeSql("SELECT * FROM fs_table ");
result.print();

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.

Re: flink1.9写权限认证的es6

2020-07-16 Thread Yangze Guo
Hi,

SQL添加认证的逻辑已经在FLINK-18361[1] 中完成了,1.12版本会支持这个功能

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Fri, Jul 17, 2020 at 10:12 AM Dream-底限  wrote:
>
> hi:
> 请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下


Re:Re: flink 1.11任务提交的问题

2020-07-16 Thread sunfulin



hi,
感谢回复。这个机制我理解了。想了解下,有办法在1.11里仍然使用1.10版本的作业提交机制么?我现在虽然把代码回滚到1.10版本的逻辑,但是提交作业仍然有问题:比如我如果不执行env.execute,那么table
 to DataStream的语句不会生成拓扑。











在 2020-07-17 12:09:20,"godfrey he"  写道:
>hi sunfulin,
>目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
>即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
>虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。
>
>Best,
>Godfrey
>
>Leonard Xu  于2020年7月17日周五 上午12:12写道:
>
>> Hi,
>>
>> 我理解目前好像做不到, cc: godfrey 大佬看看
>>
>> 祝好,
>> Leonard Xu
>>
>> > 在 2020年7月16日,23:08,sunfulin  写道:
>> >
>> > hi,
>> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
>> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
>> > 通过StreamExecutionEnvironment.execute提交,yarn
>> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
>>
>>


回复: flink1.9写权限认证的es6

2020-07-16 Thread 夏帅
get到了





来自钉钉专属商务邮箱--
发件人:Yangze Guo
日 期:2020年07月17日 13:38:35
收件人:user-zh
主 题:Re: flink1.9写权限认证的es6

Hi,

SQL添加认证的逻辑已经在FLINK-18361[1] 中完成了,1.12版本会支持这个功能

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Fri, Jul 17, 2020 at 10:12 AM Dream-底限  wrote:
>
> hi:
> 请问flink如何将数据写入到权限认证的es集群哪,没找到配置用户名密码的地方,哪位大佬帮忙解答一下



Re:Re:Re: flink 1.11任务提交的问题

2020-07-16 Thread sunfulin
hi,
补充一下,1.10版本的代码使用sqlUpdate + 
table2datastream,并通过StreamExecutionEnvironment.execute来提交。我回滚到1.10版本的代码后,因为我看1.11版本里如果使用sqlUpdate执行insertInto,必须使用StreamTableEnvironment.execute来提交。现在我的问题就是这个:我想通过一个job来提交。现在有机制可以做不?在1.11版本里执行。因为之前的job逻辑较为复杂,做拆分的话还有点麻烦。
















在 2020-07-17 13:55:21,"sunfulin"  写道:




hi,
感谢回复。这个机制我理解了。想了解下,有办法在1.11里仍然使用1.10版本的作业提交机制么?我现在虽然把代码回滚到1.10版本的逻辑,但是提交作业仍然有问题:比如我如果不执行env.execute,那么table
 to DataStream的语句不会生成拓扑。











在 2020-07-17 12:09:20,"godfrey he"  写道:
>hi sunfulin,
>目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
>即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
>虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。
>
>Best,
>Godfrey
>
>Leonard Xu  于2020年7月17日周五 上午12:12写道:
>
>> Hi,
>>
>> 我理解目前好像做不到, cc: godfrey 大佬看看
>>
>> 祝好,
>> Leonard Xu
>>
>> > 在 2020年7月16日,23:08,sunfulin  写道:
>> >
>> > hi,
>> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
>> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
>> > 通过StreamExecutionEnvironment.execute提交,yarn
>> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
>>
>>





 

??????flink-1.11 DDL ????hdfs???? Cannot instantiate user function

2020-07-16 Thread kcz
??bug??
classloader.resolve-order: parent-first
??bug??parquet


--  --
??: 
   "kcz"
<573693...@qq.com>;
: 2020??7??17??(??) 1:32
??: "user-zh"

Re: Re:Re: flink 1.11任务提交的问题

2020-07-16 Thread godfrey he
做不到,1.11里把 StreamExecutionEnvironment.execute 和
StreamTableEnvironment.execute 的逻辑已经切分干净了。
有个改动比较小的方案可以参考:可以在原来的逻辑的基础上,把两种提交job的方式放到两个不同的类中,其他的逻辑放到另外一个类共性。

sunfulin  于2020年7月17日周五 下午2:00写道:

> hi,
> 补充一下,1.10版本的代码使用sqlUpdate +
> table2datastream,并通过StreamExecutionEnvironment.execute来提交。我回滚到1.10版本的代码后,因为我看1.11版本里如果使用sqlUpdate执行insertInto,必须使用StreamTableEnvironment.execute来提交。现在我的问题就是这个:我想通过一个job来提交。现在有机制可以做不?在1.11版本里执行。因为之前的job逻辑较为复杂,做拆分的话还有点麻烦。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-17 13:55:21,"sunfulin"  写道:
>
>
>
>
> hi,
> 感谢回复。这个机制我理解了。想了解下,有办法在1.11里仍然使用1.10版本的作业提交机制么?我现在虽然把代码回滚到1.10版本的逻辑,但是提交作业仍然有问题:比如我如果不执行env.execute,那么table
> to DataStream的语句不会生成拓扑。
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-17 12:09:20,"godfrey he"  写道:
> >hi sunfulin,
> >目前这个做不到。executeSQL 和 table to DataStream 是分别优化和提交作业的。
> >即使在1.11 之前,table to DataStream 也不会和 sqlUpdate 或者 insertInto 的语句一起优化,
> >虽然只提交了一个job,但是是两个独立的pipeline,也没有计算复用,和两个job没啥差别。
> >
> >Best,
> >Godfrey
> >
> >Leonard Xu  于2020年7月17日周五 上午12:12写道:
> >
> >> Hi,
> >>
> >> 我理解目前好像做不到, cc: godfrey 大佬看看
> >>
> >> 祝好,
> >> Leonard Xu
> >>
> >> > 在 2020年7月16日,23:08,sunfulin  写道:
> >> >
> >> > hi,
> >> > 请教下flink 1.11任务提交的问题。如果我的一个作业里既有sql
> >> dml提交(executeSQL执行),又通过DataStream.addSink来写出,
> >> > 通过StreamExecutionEnvironment.execute提交,yarn
> >> per-job貌似会提交两个作业。这种情况下,我该如何处理呢?只想提交一个作业。
> >>
> >>
>
>
>
>
>
>


SQL 报错只有 flink runtime 的 NPE

2020-07-16 Thread Luan Cooper
Hi

我有这么一个 SQL
INSERT INTO es
SELECT
a,
udf_xxx(b)
FROM mongo_oplog -- 自定义 TableFactory

Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码 Exception,可以稳定重现

LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1)
(bdf9b131f82a8ebc440165b82b89e570) switched from RUNNING to FAILED.

java.lang.NullPointerException

at StreamExecCalc$8016.split$7938$(Unknown Source)

at StreamExecCalc$8016.processElement(Unknown Source)

at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)

at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)

at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)

at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)

请问这种怎样情况排查问题?
有任何线索都可以

感谢


Re:flink1.10升级到flink1.11 jar冲突

2020-07-16 Thread xyq
hello  大家好  flink由1.10升级到1.11
从savepoint处恢复数据报错(这个报错的是flink sql双流join的,带状态,其他的stream的单流程序都已经照常恢复)
请大家帮忙指导一下,谢谢。


报错如下:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Number of retries has been exhausted.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Number of retries has been exhausted.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at 
com.zhidao.bigdata.plat.dophin.streaming.warehouse.dwd.iotbind.etl.DwdBaseCarlifeTIotBindGeoManage.main(DwdBaseCarlifeTIotBindGeoManage.java:282)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Number of retries has been exhausted.
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:302)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:323)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:339)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadE

Re: SQL 报错只有 flink runtime 的 NPE

2020-07-16 Thread godfrey he
udf_xxx的逻辑是啥?


Luan Cooper  于2020年7月17日周五 下午2:40写道:

> Hi
>
> 我有这么一个 SQL
> INSERT INTO es
> SELECT
> a,
> udf_xxx(b)
> FROM mongo_oplog -- 自定义 TableFactory
>
> Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码 Exception,可以稳定重现
>
> LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1)
> (bdf9b131f82a8ebc440165b82b89e570) switched from RUNNING to FAILED.
>
> java.lang.NullPointerException
>
> at StreamExecCalc$8016.split$7938$(Unknown Source)
>
> at StreamExecCalc$8016.processElement(Unknown Source)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>
> at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>
> at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>
> at
> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>
> at java.lang.Thread.run(Thread.java:748)
>
> 请问这种怎样情况排查问题?
> 有任何线索都可以
>
> 感谢
>