keyby() 时,还没有选好分组呢,这个只是告诉flink 要根据什么分组,所有也没有Runtime...
junjie.m...@goupwith.com 于2022年9月8日周四 14:17写道:
> hi:
> flink keyby()时能否获取到subTask的编号,根据编号分组,让上游数据可以继续保持原有的数据依然在同一个subTask中进行后续计算。
>
> 在AbstractRichFunction类中this.getRuntimeContext().getIndexOfThisSubtask()可以获取编号,但是keyby()的KeySelector类中没有getR
应该是为了 流批一体 。不丢数据
Kyle Zhang 于2022年9月8日周四 08:37写道:
> Hi all,
> 看table
> store的介绍也是关于数据湖存储以及用于实时流式读取的,那在定位上与iceberg、hudi等项目有什么不一样么,为什么要再开发一个项目?
>
> Best.
>
--
Best,
pp
table source 支持了
> >projection 和 limit 下推, 如果有需求做其他 pushdown.可以尝试自行扩展 connector 来实现比如
> >filter/aggregate pushdown 满足前置过滤需求
> >
> >
> >Best,
> >Lincoln Lee
> >
> >
> >r pp 于2022年3月31日周四 18:40写道:
> >
> >> hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了
> >>
>
--
Best,
pp
table source 支持了
> >projection 和 limit 下推, 如果有需求做其他 pushdown.可以尝试自行扩展 connector 来实现比如
> >filter/aggregate pushdown 满足前置过滤需求
> >
> >
> >Best,
> >Lincoln Lee
> >
> >
> >r pp 于2022年3月31日周四 18:40写道:
> >
> >> hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了
> >>
>
--
Best,
pp
hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了
应该基于哪些考虑 ,在这个场景下 只要符合3min内能放下的数据是不是就可以了
>
> best regards!
>
> r pp 于2022年3月27日周日 23:46写道:
>
> > hi~ 因为3min 的Trigger 触发 ,所以,内存里会保存3min内的数据,然后,删除又新增。所以你这边 3min
> > 内总数据量是多少?内存大概多大?可以试着调整TM 的内存量
> >
>
hi~ 因为3min 的Trigger 触发 ,所以,内存里会保存3min内的数据,然后,删除又新增。所以你这边 3min
内总数据量是多少?内存大概多大?可以试着调整TM 的内存量
先编译正确后,再debug
tangzhi8...@gmail.com 于2021年6月28日周一 下午3:02写道:
> 目的:想在本地环境IDEA远程调试Flink
> 步骤:
> 1.这是Debug的配置项
> 2.报错堆栈信息:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute job 'Streaming WordCount'.
> at
> org.apache.flink.client.prog
如果你找到正确的JobManager 和TaskManager 的启动命令,如: Java -Dlog.file =...
找到log4j中 类似:${sys:log.file} , ${env:MAX_LOG_FILE_NUMBER:-10} ,一个是系统参数
-Dlog.file ,一个环境变量。
方法有:
1.直接在 运行命令中 加入新参数,再从 log4j 读取相应参数即可,相对直接一些,有可能需要改动源码
2.从日志名入手,你看无论TM 还是 JM ,看日志就区别好了,只要获取
日志名,再正则(在log4j配置中如何正则呢?)获取你想要的日志关键字,取为Kafka的to
可能和配置文件有关吧,我用的都是默认配置
smq <374060...@qq.com> 于2021年6月8日周二 上午7:10写道:
>
> 图里边可以看到,这个/jobmanager.log /jobmanager.out
> /jobmanager.err中的LOG_DIR应该是一样的,也就是说这三个日志应该是放在一个目录下。至于什么原因少了这个. log
> 确实是不清楚。
>
>
>
> -- 原始邮件 --
> *发件人:* r pp
configure-file
>
>
>
> -- 原始邮件 ----------
> *发件人:* r pp
> *发送时间:* 2021年6月7日 17:24
> *收件人:* smq <374060...@qq.com>, user-zh
> *主题:* 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>
> 好的,如果log 文件就没有产生,是真的没有产生,还有一个flink 的原因是,没有读取到日志的配置文件。
个文件是在一个containerid
> 目录下在一起的,不正常的就是同一个containerID目录下只有这两个文件,这是对此很多次之后发现的。还有就是我说的完全一样的两个程序是打成两个jar
> 包的,这两个程序是在不同模块下,为了找到原因,已经改成了完全一样的程序,但是结果还是之前正常的每次运行都正常,不正常的这个改成跟正常的程序一摸一样还是运行不正常。经过这一周多的测试发现,这个结果不是随机出现的。所以就感觉很奇怪。
>
>
>
> -- 原始邮件 --
> *发件人:* r pp
org.apache.flink.runtime.entrypoint.
ClusterEntrypoint [] - YARN daemon is running as: hdfs Yarn client user
obtainer: hdfs
r pp 于2021年6月4日周五 下午6:11写道:
> 嗨~
> 我这边是 per-job on yarn 的mode
>
> 我查看的 yarn 的container 的日志信息,一般在程序初始化的时候,就会生产日志名,日志路径,变成环境env,
> 然后 形成config ,再启动 cluster。
>
> 而日志路径 是yarn 的配置模
示的。奇怪的一点是我们组有别的程序是正常的,但是一部分在webUI不显示日志。我们目前是从1.10升级到1.12,这种情况在1.12出现的
> >
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: r pp > 发送时间: 2021年6月2日 15:08
> > 收件人: user-zh > 主题: 回复:flink1.12版本,yarn-application模式Flink web ui看
嗨~ 你们有没有改日志文件的名字
smq <374060...@qq.com> 于2021年6月2日周三 下午12:24写道:
> 你这个解决了吗,我也遇到了同样的问题
>
>
>
>
>
> -- 原始邮件 --
> 发件人: todd 发送时间: 2021年4月14日 19:11
> 收件人: user-zh 主题: 回复:flink1.12版本,yarn-application模式Flink web ui看不到日志
>
>
>
> yarn上只有.out,.error的日志信息,但是从flink web ui的
'properties.transaction.timeout.ms' = '3' 配置的太短了,30s
transactionalId 就过期了。 估计 都来不去启动吧
官网的原文
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
that were started before taking
flink 的反压机制 不就是在限流么?
suisuimu <726400...@qq.com> 于2021年6月1日周二 下午5:37写道:
> Flink从Kafka读取数据时,是否支持用户自定义的限流策略。
> 例如根据消息中的某个字段的名称,设置流控规则。
> 请问是否支持呢?还是需要自己借助第三方组件(例如sentinel)来实现?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
--
Best,
pp
你的网络环境是怎么样? 是在docker 上跑么?还是怎么?
从报错上看,netty 无法解码导致的,但是为什么会出现这样的现象?
或许 你可以把问题贴的在详细一点
5599 <673313...@qq.com> 于2021年6月1日周二 下午2:32写道:
> 退订
>
>
>
>
> -- 原始邮件 ----------
> 发件人: "r pp" 发送时间: 2021年6月1日(星期二) 下午2:07
> 收件人: "user-z
你的程序有挂掉么?
mq sun 于2021年5月31日周一 下午7:23写道:
> 大家好:
> 最近在生产中,不同项目组的两个flink程序在同一时间段都报下面异常
> ERROR org.apache.flink.runtime.blob.BlobServerConnection -Error while
> excuting Blob connection
> .
> .
> .
>
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException
> :Adj
谢谢,好奇为什么要这么做,动态编译么?
Qishang 于2021年5月26日周三 下午1:57写道:
> Hi.
>
> 会生成 `${project.basedir}/target/generated-sources/`
>
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml#L97
>
> r pp 于2021年5月25日周二 上午9:58写道:
>
>
各位好,请问下,
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
在该类下的
/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/
hi~ io.debezium 包版本 是你自己配置的? 还是 官方的? 尝试的解决下,所以问的
guoxb__...@sina.com 于2021年1月13日周三 下午4:32写道:
> HI:
> 大家好,我现在遇到了一个问题,flink在通过cdc的方式读取binlog的方式进行读取日志的时候报错,具体报错如下:
>
> --
hi~ Java 语法不支持,Long 可以设置
赵一旦 于2021年1月7日周四 下午8:13写道:
> 报错信息如下:
> java.lang.IllegalArgumentException: Can not set long field
> com.xxx.buzz.pojo.AbstractDrRecord.timestamp to null value
> at
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
> UnsafeFieldAccessorImpl.java:
hi,没有效果 具体是啥?
cxx <1156531...@qq.com> 于2021年1月7日周四 上午9:53写道:
> 我从kafka消费一条数据,然后将消息进行切分,再发送到下游的kafka中,但是这样不能保证在一个事务里面。
> 例如:我将一条数据切分成10条,然后再第五条的时候抛出一个异常,但是前四条已经发送到下游的kafka了。
> 我设置了事务id,隔离级别,client
> id,enable.idempotence,max.in.flight.requests.per.connection,retries
> 但是没有效果。
>
>
>
> --
> Sen
嗨~ 从flink 的启动 sh 文件里面可以看到,启动java 虚拟机的时候,就设置好 日志文件名了。改了名字,这次的JOB
https://github.com/apache/flink/pull/11839/files
FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${SERVICE}-
${id}-${HOSTNAME}"
log="${FLINK_LOG_PREFIX}.log"
log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=fi
gmail 可能有些不兼容,看不到截图
19916726683 <19916726...@163.com> 于2020年12月24日周四 上午10:51写道:
> hive的官网有介绍ACL,如何继承权限关系。源码在Hive-> HDFSUtils类中 核心代码应该是上面的这点。
>
> Original Message
> *Sender:* Rui Li
> *Recipient:* user-zh
> *Date:* Wednesday, Dec 23, 2020 19:41
> *Subject:* Re: Flink catalog+hive问题
>
> hive的ACL用
表a 在 sql 语句的哪里呢?
关心的真的是过滤问题么? 如果你对你的业务十分熟悉,且了解到 flink1.11 不过 过滤,那为什么 不自行过滤 优化下呢?
如果,不是过滤问题,是大数 join 小数 问题,或者 大数 join 大数问题,是不是可以考虑 广播传播 或者 并行度 的优化方向?
是不是应该 先分析好业务问题,在去看 flink1.12 能否解决问题。
肖越 <18242988...@163.com> 于2020年12月24日周四 上午11:16写道:
> connector 从数据库读取整张表格,执行:
> env.sql_query("select a , b, c
flink 提交到特定的node ,可以保证 其它的任务 不能提交到flink特定的node 上么?
xiao cai 于2020年12月22日周二 上午10:28写道:
> Hi
> 可以考虑使用yarn的node label特性,将flink的任务提交到特定的node上
>
>
> Original Message
> Sender: r pp
> Recipient: user-zh
> Date: Monday, Dec 21, 2020 21:25
> Subject: Re: Flink on yarn 如
编译问题,大多包没下好,多来几次
mvn clean install -DskipTests -Drat.skip=true
亲测有效
shaoshuai <762290...@qq.com> 于2020年12月21日周一 下午4:53写道:
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile
> (default-testCompile) on project flink-parquet_2.11: Compilation failu
程序中,创建表后,执行命令。
kingdomad 于2020年12月21日周一 下午4:55写道:
> flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。
> 需要执行msck repair table修复分区表后,hive才能读取到数据。
> 求助大佬,要如何解决。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> kingdomad
>
>
嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大?
于2020年12月21日周一 下午5:48写道:
> 通过yarn label可以实现
>
> -邮件原件-
> 发件人: user-zh-return-10095-afweijian=163@flink.apache.org
> 代表 yujianbo
> 发送时间: 2020年12月21日 16:44
> 收件人: user-zh@flink.apache.org
> 主题: Flink on yarn 如何指定固定几
sql 的本质其实是 让用户不用关心 是流处理 还是 批处理,比如 ,计算 当天某个视频的点击总数。是一个累加结果,可以实时查询出变化。
但flink 不是一个存储系统,就会存在一个问题,使用sql 状态值 怎么办?
官博 都有说明,也说了哪些算子背后 适用于 Streaming or Batch or both。以及存在的使用注意事项
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/
jiangjiguang719 于2020年12月21日周一 下午7:44写道:
> flink
是的
张锴 于2020年12月19日周六 下午5:45写道:
> 我按官网操作,重写了序列化方式
>
> val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
> props)kafkaSource.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor[MyType] {
> def extractAscendingTimestamp(element: MyType): Long =
> element.eventTim
我觉得补充完整的 故障信息,以及你的资源配置信息,实例代码 可以更好的让别人回答你的问题
zhy 于2020年12月18日周五 下午4:07写道:
>
> 补充一下,状态后端选择的是rocksdb,检查点间隔为15分钟,超时时间为5分钟,感觉5分钟超时已经很大了,结果检查点线程还是会被中断,是需要继续调大超时时间吗
>
> zhy 于2020年12月18日周五 下午3:57写道:
>
> > hi、
> >
> >
> 我这面在使用flink1.11.2做实时特征的时候,状态大小大约在30g左右的时候任务就不能继续运行了,而查看异常日志发现大量的InterruptedExceptio
你好,能否把 promethus上metrics, rocksdb_block_cache_usage的大小不断上升的
截图发一下,其它rocksdb 的内存图 如果有的话,也发一下
开始时间 到 结束时间 3个 小时的。
867127831 <867127...@qq.com> 于2020年12月18日周五 下午3:15写道:
> Hi,
>
>
> 我在flink 1.11 on k8s上运行了一个双流join的sql,使用rocksdb作为backend,flink
> managed部分的内存由flink托管(state.backend.rocksdb.memo
这个问题 ,一个很朴素的思路 ,你集群里面的在哪里 ,就填哪里咯
Jacob <17691150...@163.com> 于2020年12月18日周五 下午4:13写道:
> Dear all,
>
> 请问在flink在集成hive时候,需要配置hive的conf目录,我的job是on yarn提交的,那么如何配置这个hive conf路径呢?
>
> String name = "myhive";
> String defaultDatabase = "mydatabase";
> String hiveConfDir = ""; /
一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
去掉kafka sink ,看下 写入效果。
再对比下 加入kafka 后的效果。
一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了
guoliubi...@foxmail.com 于2020年12月18日周五 下午2:01写道:
> Hi,
>
> 我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
> .process(new Pr
按照我朴素的思路,你的yarn环境可以读取hiveConf 的信息吧。。。
on Yarn 的提交模式,和本地是不同的
另一种是提交的时候 添加配置项
–files $HIVE_HOME/conf/hive-site.xml
Jacob <17691150...@163.com> 于2020年12月19日周六 上午9:26写道:
> Dears,
>
> flink在连接hive时,需配置hiveConf所在路径
>
> 我已经下载了集群中hive-site.xml文件,不知道应该放在哪个目录
>
> Job部署模式是 on Yarn
> ,请问代码中hiveConf应该放在哪个目录下,
一般性的推断是,模式 是属于配置项,若出现问题了,系统读取 或者 改变 配置项,能解决问题么?
之前的学习经验,计算机的解决方案是 出现问题,大都是保护现场,等问题解决后,释放现场。
状态 可以 类比是现场,当问题出现的时候,重点在状态的保护是怎么实现的,和配置没有太大关系,因为完全可以不读取配置。
配置项是面向用户选择 state 的方式,不是解决问题的方式
Storm☀️ 于2020年12月18日周五 上午11:50写道:
> state.backend.incremental 出现问题的时候增量模式是开启的吗?
>
>
>
> --
> Sent from: http://apa
一般性的推断是,模式 是属于配置项,若出现问题了,系统读取 或者 改变 配置项,能解决问题么?
之前的学习经验,计算机的解决方案是 出现问题,大都是保护现场,等问题解决后,释放现场。
状态 可以 类比是现场,当问题出现的时候,重点在状态的保护是怎么实现的,和配置没有太大关系,因为完全可以不读取配置。
配置项是面向用户选择 state 的方式,不是解决问题的方式。
Storm☀️ 于2020年12月18日周五 上午11:50写道:
> state.backend.incremental 出现问题的时候增量模式是开启的吗?
>
>
>
> --
> Sent from: http://ap
嗨。提供一个解决的思路
1.缺包
2在yarn 的环境下缺包,可以把缺的包 放在集群统一的位置,在提交命令时,指名所在包的位置。
Jacob <17691150...@163.com> 于2020年12月18日周五 下午2:01写道:
> Dear All,
>
> Flink.11.2操作hive时,对hive的版本支持是怎样的
>
>
> 看官网介绍是支持1.0、1.1、1.2、2.0、2.1、2.2、2.3、3.1
> 我的执行环境:
>
> *Flink : 1.11.2*
> *Haoop : 2.6.0-cdh5.8.3*
> *Hive : 1.
41 matches
Mail list logo