hi、
我这面在使用flink1.11.2做实时特征的时候,状态大小大约在30g左右的时候任务就不能继续运行了,而查看异常日志发现大量的InterruptedException,请问这种情况是集群的问题还是flink的问题,而另一个3G状态的任务依然正常运行
Hi,想请教一下大家:
最近通过flink_taskmanager_job_task_operator_KafkaConsumer_records_consumed_rate指标发现,
flink某个任务消费一个topic A 竟然比消费topic A,B,C,D一起的指标要高,
也就是我四个topic每秒消费的数据竟然还没其中一个topic每秒消费的数据高,
所以想请问:flink1.9.1 支持一个 source 指定消费多个 topics么?
我的代码如下:
val A= params.getProperty("kafka.scene.data.topic")
val
版本1.11.2
用这种方式FlinkKafkaProducer(String topicId, SerializationSchema
serializationSchema, Properties producerConfig)生产没问题,数据写入无报错
这种方式
FlinkKafkaProducer(
String defaultTopicId,
KeyedSerializationSchema serializationSchema,
Properties producerConfig,
Hi flink1.12group
by current_date,userId ??flink
state?? 1??Stream??TTL
2??tabEnv.getConfig().setIdleStateRetention(Duration.ofDay
Hi,
flink 1.11 on k8sjoin??sql??rocksdbbackend??flink
managedflink??state.backend.rocksdb.memory.managed=truek8s??pod
flink sql:
insert into console_sink
select t1.*, t2.*
from t1 left join t2
on t1.u
?? org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl#create
?? ??
if (!settings.isStreamingMode()) {
throw new TableException(
"StreamTableEnvironment can not run in batch
mode for now, please us
我现在也碰到了这个问题,也是删除了方法中检查模式的代码 settings.isStreamingMode()。源码为什么要加这样一个检测呢? 感觉
StreamTableEnvironmentImpl 原来是能跑批的,现在加上这个检测 反而不能了。我看最新版的 1.12 还有这个检测代码呢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
有,但是贴做附件时因为超长没法发出去
:
ezmlm-reject: fatal: Sorry, I don't accept messages larger than 100 bytes
(#5.2.3)
发你私人邮箱是否方便
guoliubi...@foxmail.com
From: magichuang
Date: 2020-12-17 20:18
To: user-zh
Subject: Re: Re: flink clickhouse connector
您是用java写的还是pyflink 啊? 我是用pyflink写的程序,所以需要一个jar包,您那里
Hi,
我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
.process(new ProcessFunction() {
@Override
public void processElement(RatioValuevalue, Context ctx,
Collector out) throws Exception {
out.collect(value);
ctx
Dear All,
Flink.11.2操作hive时,对hive的版本支持是怎样的
看官网介绍是支持1.0、1.1、1.2、2.0、2.1、2.2、2.3、3.1
我的执行环境:
*Flink : 1.11.2*
*Haoop : 2.6.0-cdh5.8.3*
*Hive : 1.1.0-cdh5.8.3*
*Job运行方式 : on yarn*
同时对读写hive的demo,我不知道我写的是否正确:
public static void main(String[] args) throws Exception {
EnvironmentSettin
state.backend.incremental 出现问题的时候增量模式是开启的吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi all When I use SQL with UDTF, when I call the
tableEnv.sqlQuery () method, I throw the following error: Rowtime attributes
must not be in the input rows of a regular join. As a workaround you can cast
the time attributes of input tables to TIMESTAMP before. I used the
to_timestamp functi
OK,我看目前flink-docker项目里面的docker-entrypoint.sh是正常,有其他问题你再继续反馈
Best,
Yang
superainbower 于2020年12月18日周五 上午8:33写道:
> hi,我重新git下来,build又可以了,可能之前我下的有文件有问题
>
> 在2020年12月17日 14:08,Yang Wang 写道:
> 你直接clone下来,然后cd到1.12目录,这样build出来的镜像也是可以的
>
> 你用build的镜像启动Flink任务是报什么错呢,我这边试了一下是可以正常运行的
>
> - git clone ht
mini batch默认为false 。题主问题找到了吗
--
Sent from: http://apache-flink.147419.n8.nabble.com/
由失误操作使用了企业邮箱订阅,目前无法取消订阅,向user-zh-unsubscribe发送多封邮件也无效?请问官方有解决办法么?
Hi~谢谢 Yun Tang 大佬的解答~
不过这个指标不能单任务配置么?官网有这么个提示:
"启用本机指标可能会导致性能下降,应谨慎设置"[1]
所以如果全局配置,其他没有用RocksDB的任务也会尝试发送这个指标,那会不会导致其他任务的性能下降?感觉这样不是很科学啊?
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#rocksdb-native-metrics
-
Best Wishes
--
Sent from: http://apache-flink.
问题我自己已经解决。
> 在 2020年12月17日,下午9:00,丁浩浩 <18579099...@163.com> 写道:
>
> flink版本:1.11.1
> udaf函数代码来自于阿里云官网文档
>
> 以下是代码
> public class TestSql {
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env =
> StreamExecutionEnviro
https://issues.apache.org/jira/browse/FLINK-20646
Thank you~
Xintong Song
On Thu, Dec 17, 2020 at 11:40 PM zhisheng wrote:
> hi,xintong
>
> 有对应的 Issue ID 吗?
>
> Xintong Song 于2020年12月17日周四 下午4:48写道:
>
> > 确实是 1.12.0 的 bug。
> > 我们在所有用到 state 的地方都应该去声明 ManagedMemoryUseCase.STATE_BACKEND。有一个新添
1. 加jvm参数可以使用env.java.opts.taskmanager配置
2. 目前tm中没有对heap memory进行slot间细粒度管理,session模式下不支持这种功能
Best,
Yangze Guo
On Fri, Dec 18, 2020 at 9:22 AM guoliubi...@foxmail.com
wrote:
>
> Hi,
> 现在使用的是flink1.12,使用standalone cluster模式运行。
> 在上面运行一个Job内存消耗大,会用满heap然后把整个task manager带崩掉。
> 想问下怎么给task manager的jv
我也是在这个讨论群学到的,你看下能否解答你的问题
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#env-java-opts
这个里面可以配置task manager的虚拟机参数,在虚拟机参数里配置 -XX:OnOutOfMemoryError=kill -9
%p,这样当OOM时杀掉task manager进程
yinghua...@163.com
发件人: guoliubi...@foxmail.com
发送时间: 2020-12-18 09:22
Hi,
现在使用的是flink1.12,使用standalone cluster模式运行。
在上面运行一个Job内存消耗大,会用满heap然后把整个task manager带崩掉。
想问下怎么给task manager的jvm加上heap dump相关参数。
还有是否有选项,可以在某个job吃满heap后是kill这个job而不是shutdown整个task manager,因为这个task
manager还有其他job在跑,会导致其他job一起fail。
guoliubi...@foxmail.com
hi,我重新git下来,build又可以了,可能之前我下的有文件有问题
在2020年12月17日 14:08,Yang Wang 写道:
你直接clone下来,然后cd到1.12目录,这样build出来的镜像也是可以的
你用build的镜像启动Flink任务是报什么错呢,我这边试了一下是可以正常运行的
- git clone https://github.com/apache/flink-docker.git
- cd scala_2.11-java8-debian
- sudo docker build -t flink:1.12.0 .
- docker push
hi,xintong
有对应的 Issue ID 吗?
Xintong Song 于2020年12月17日周四 下午4:48写道:
> 确实是 1.12.0 的 bug。
> 我们在所有用到 state 的地方都应该去声明 ManagedMemoryUseCase.STATE_BACKEND。有一个新添加的
> ReduceTransformation 没有做这个声明,导致所有涉及到这个算子的作业使用 RocksDB 都会出问题。
> 我马上建 issue,这个可能要推动社区加急发一个 bugfix 版本了
>
> Thank you~
>
> Xintong Song
>
>
>
Hi
这些metrics启用的配置是放到flink conf里面的,不是让你直接在代码里面调用的。
祝好
唐云
From: bradyMk
Sent: Thursday, December 17, 2020 20:56
To: user-zh@flink.apache.org
Subject: Re: flink1.9.1 如何配置RocksDB的block-cache-usage参数
谢谢 Yun Tang 大佬的解答~
另外,还想请教一下:我在代码中设置开启了cur-size-all-mem-tables的监控
flink版本:1.11.1
udaf函数代码来自于阿里云官网文档
以下是代码
public class TestSql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = FlinkUt
谢谢 Yun Tang 大佬的解答~
另外,还想请教一下:我在代码中设置开启了cur-size-all-mem-tables的监控,代码如下:
//设置RocksDB状态后端,且开启增量ck
val backend = new RocksDBStateBackend(path, true)
//监控配置项
val metricOptions = new RocksDBNativeMetricOptions
metricOptions.enableSizeAllMemTables()
//设置预选项
backend.setPrede
您是用java写的还是pyflink 啊? 我是用pyflink写的程序,所以需要一个jar包,您那里有嘛,我本地是新安装的maven,在打包
但是一直在下载依赖好多。。
> -- 原始邮件 --
> 发 件 人:"guoliubi...@foxmail.com"
> 发送时间:2020-12-17 19:36:55
> 收 件 人:user-zh
> 抄 送:
> 主 题:Re: flink clickhouse connector
>
> 我这也是往clickhouse写数据,用官方的或是其他第三方的
dear all:
我有一个flink流式任务,checkpoint周期5分钟,超时时间3分钟。
此任务中调用了第三方接口,正常情况下没问题,正常的checkpoint时长仅80ms。
但由于第三方接口发生了拥堵,有部分调用会超时(接口调用超时设置了5秒钟),
然后此算子的checkpoint就会超时,
checkpoint 3179 of job expired before completing
trying to recover from a global failure
exceeded che
我使用的是第三方的驱动,clickhouse-native-jdbc,通过JDBC的方式。
> 2020年12月17日 18:41,magichuang 写道:
>
> hi想问一下有小伙伴使用flink
> 往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
> 这个flink-connector,但是运行报错了:
>
> Caused by: java.io.IOException: unab
我这也是往clickhouse写数据,用官方的或是其他第三方的JDBC驱动(我用的https://github.com/blynkkk/clickhouse4j),然后用JdbcSink就能写入了,不需要另外写connector。
guoliubi...@foxmail.com
From: magichuang
Date: 2020-12-17 18:41
To: user-zh
Subject: flink clickhouse connector
hi想问一下有小伙伴使用flink
往clickhouse里面写数据嘛?我是使用的https://help.aliyun.
你好,我在测试1.12版本时,在虚拟机上部署了一个jobmanager,三个taskmanager;
环境信息:
1.centos7/虚拟机,双网卡(一个.4网段,一个.18网段)
2.jobmanager 1节点;taskmanager3节点,未开启高可用
配置中发现hostname
对应的是其中的一个网卡的(18网段)ip;而flink的masters/workers文件全部配置的是.4网段的ip,最重要的是jobmanager.rpc.address也配置的是.4网段,启动集群之后提交WordCount示例,提交不成功,报错与FLINK-19677一样
在节点上重
hi想问一下有小伙伴使用flink
往clickhouse里面写数据嘛?我是使用的https://help.aliyun.com/document_detail/185696.html?spm=a2c4g.11186623.6.606.6222693bubxXzX
这个flink-connector,但是运行报错了:
Caused by: java.io.IOException: unable to establish connection to ClickHouse
at
com.aliyun.flink.connector.clickhouse.table.inter
您好,镜像打完之后,向K8S提交jobmanager-job.yaml的时候,jobmanager起不来,看日志,日志里报镜像时的docker-entrypoint.sh脚本第102行缺少
),我比对了下1.12 和1.11镜像里的 docker-entrypoint.sh,1.12里102行(
_args=("${_args[@]:1}")对应的是 一个新加的 方法
disable_jemalloc_env() {
# use nameref '_args' to update the passed 'args' within function
local -n _ar
?? flink
sql??cdccdc??state??
state??
val config: TableConfig = tabEnv.getConfig
config.setIdleStateRete
print(page_turn.to_pandas())
> 可以collect到client端[1],或者可以看看另外几种方式[2]: [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/python/pyflink.table.html#pyflink.table.TableResult.collect
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-user
[Microsoft] Groups
[cid:TransparentConsumerWelcomeMailNonOutlookTopBanner]
曹 三启 added
you to flink!
Use the group to share messages and files, and to coordinate group events.
3 members
Have group discussions
With only one email address to remember, connecting with everyone is easy.
Email the gr
确实是 1.12.0 的 bug。
我们在所有用到 state 的地方都应该去声明 ManagedMemoryUseCase.STATE_BACKEND。有一个新添加的
ReduceTransformation 没有做这个声明,导致所有涉及到这个算子的作业使用 RocksDB 都会出问题。
我马上建 issue,这个可能要推动社区加急发一个 bugfix 版本了
Thank you~
Xintong Song
On Thu, Dec 17, 2020 at 11:05 AM HunterXHunter <1356469...@qq.com> wrote:
> 1.12设置 env
flink 1.10.1 同样遇到这个问题 设置了ttl但是没有生效,请问题主解决该问题了吗?
*sql*:
select
*
from
xx
group by
TUMBLE(monitor_processtime, INTERVAL '60' SECOND),topic_identity
*60s的窗口,设置的过期时间是2分钟,但是checkpoint中状态还是在变大*
*tEnv.getConfig().setIdleStateRetentionTime(Time.minutes(2),
Time.minutes(5)); *
--
Sen
38 matches
Mail list logo