keyedStream? key不同可能是。
谌祖安 于2021年2月7日周日 下午6:00写道:
> 您好!
>
>
> 重载procesElement方法,每条stream数据处理时,put数据进入map,后面每条数据处理时先判断是否在map中有相同的key值,无则新增后put进map,有则加工后put进map。
> 在实际代码中发现,每次写入put后,map有数据,但处理下一条数据时,先从map读取数据,发现map已经为空。
> 请问是哪里写错了吗? 和 flink官网中 state.update(current);有什么不同吗?
>
> 以下为代码:
>
-defined source or implementation,
I do not know whether the problem have something to do with it.
赵一旦 于2021年2月7日周日 下午4:00写道:
> 截图也没办法反应动态变化的过程。
>
> 目前是10机器的Standalone集群,状态在5G左右。通过flink-client端提交任务,然后web-ui刷新就一直转圈,过一会(几十秒大概)就OK啦,然后刚刚OK一瞬间会有很多个处于Initialize状态的任务,然后慢慢(10s内吧)没掉。
>
> flink-c
截图也没办法反应动态变化的过程。
目前是10机器的Standalone集群,状态在5G左右。通过flink-client端提交任务,然后web-ui刷新就一直转圈,过一会(几十秒大概)就OK啦,然后刚刚OK一瞬间会有很多个处于Initialize状态的任务,然后慢慢(10s内吧)没掉。
flink-client端的话,有时候正常提交完成,有时候出现报错(类似说是重复任务的)。
zilong xiao 于2021年2月7日周日 下午3:25写道:
> 有截图吗?
>
> 赵一旦 于2021年2月7日周日 下午3:13写道:
>
> > 这个问
这个问题现在还有个现象,我提交任务,web
UI就类似卡住状态,过一会刷新出来任务,会有4-5个initialize状态的任务,然后几秒之内陆续消失,剩下1个。
目前怀疑是有什么重试机制,导致重复提交N个任务,然后可能还有什么去重机制,然后其中几个陆续自动停掉?
赵一旦 于2021年1月26日周二 上午10:51写道:
> 如上,目前发现以前很快(10-30s)内能从敲命名到running的任务。现在有时候innitialize阶段就得1-2min。不清楚啥情况。
>
我看Flink的要求是public,每个属性要么public,要么有getter/setter。估计内嵌的属性也会递归检查的。
℡小新的蜡笔不见嘞、 <1515827...@qq.com> 于2021年2月3日周三 下午1:52写道:
> 你好,我们是否可以通过对该类LinkedHashMap进行包装来实现当前功能呢?如果你需要PojoSerializer来序列化数据的话。
>
>
>
>
> --原始邮件------
> 发件人: "赵一旦" 发送时间:
如题,按照flink对POJO的定义,感觉还是比较严格的。
我有个类是继承了LinkedHashMap的,就被当作GenericType了。像这种情况,我没办法去修改LinkedHashMap实现,同时也不好不继承。因为我一个实体是动态扩展,不清楚有多少属性的,需要json方式反序列化到Map类型上。
, 2021 at 11:29:25, 赵一旦 (hinobl...@gmail.com) wrote:
>
> 这个问题有人知道吗?
> 我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。
>
> 毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。
>
>
> 赵一旦 于2021年1月28日周四 下午6:03写道:
&
如题,在RichFunction中如何获取输入元素类型。TypeInformation。
目前这部分信息封在transformation中,在function层面貌似没有。
function中需要用到,如果可以获取,可以省略一个传参。
如题,报错如下。
2021-02-02 20:44:19
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
95
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at
tStream).broadcast(descriptor)
> mainStream.connect(dimensionStream)
> ...
> ```
> 注:我怕没描述清楚问题:问题在于主流和广播流关联的时候,广播流不一定会先于主流到,导致关联不到配置
>
> 在 2021年2月1日 13:30,赵一旦 写道:
>
>
> FlinkSQL ? javenjiangfsof 于2021年2月1日周一
> 上午11:40写道: > Hi 社区的各位 > >
> 最近也是刚刚开始接触flink,现在是有
FlinkSQL ?
javenjiangfsof 于2021年2月1日周一 上午11:40写道:
> Hi 社区的各位
>
> 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
> +
> broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
> 1.初始化通过jdbc获取,通过fromCollection处理后,union
>
这个问题有人知道吗?
我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。
毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。
赵一旦 于2021年1月28日周四 下午6:03写道:
> 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
> 报错堆栈如下,关键错误是
所有分区无数据,为什么还期望watermark推进呢?目的是啥。貌似没啥需要计算的呀。
LakeShen 于2021年1月28日周四 下午7:42写道:
> 如果是窗口类聚合,可以尝试一下自定义窗口 Trigger
>
> Best,
> LakeShen
>
> 林影 于2021年1月28日周四 下午5:46写道:
>
> > Hi, Jessica.J.Wang
> > 开源flink看起来没这个功能哈,文档翻了一遍没找到
> >
> > Jessica.J.Wang 于2021年1月28日周四 下午5:25写道:
> >
> > > 你使用的是什么窗口呢,是
此处的trigger你可以理解成是计算结果的输出时机,你的reduce计算还是增量的哈。
xiaolail...@163.com 于2021年1月29日周五 上午11:27写道:
> 您好!最近刚开始学习flink,问一个关于trigger的问题:
>
> 如下的reduce操作:
> env.socketTextStream("localhost", )
> .flatMap(new Splitter())
> .keyBy(value -> value.f0)
>
lement(Context context, Window window, Element element);
>
> void fireWindow(Context context, Window window);
> }
>
> interface WindowedRuntimeContext {
> State getWindowedState(StateDescriptor descriptor).
> }
>
> 把所有 window concepts 交给 WindowOperator,WindowFunctio
问题如title,当然当前Flink的API还是可以实现RichReduceFunction的效果的,就是基于windowFunction或者processWindowFuntion。
但是,windowFunc和reduceFunc最大的区别在于windowFunc是窗口触发操作,而reduce是实时的增量操作。
如果说我的窗口计算对于每个record的到来都需要一个极复杂的操作,我更希望在reduce中完成,而不是windowFunc中完成,因为那样会导致整个窗口所有key的数据几乎在同一时刻触发,这回导致压力变高。
如题,我想知道flink shade了多个包,比如jackson,guava等。
其目的是(1)flink用到这些,为了避免冲突所以shade。还是(2)flink推荐用户直接使用flink shade好的这些包?
如上,我想知道是否“推荐”用户直接使用flink
shade的这些包。还是我们自己去依赖自己的包,比如我当前就用到了jackson,以及guava(我直接用了最新的30-jre的版本)。
如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。
报错堆栈如下,关键错误是什么无法访问public修饰的成员?
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
好的。
zilong xiao 于2021年1月26日周二 下午2:13写道:
> Hi
>
>
> flink从1.11开始应该支持log4j,logback,log4j2了,1.11之前的版本只支持前两者,log4j2也是可以用.properties配置的,现在1.12里的默认配置就是log4j2
>
> 祝好~
>
> 赵一旦 于2021年1月26日周二 下午1:27写道:
>
> >
> >
> 网上很多人说log4j2是使用.xml配置。但是flink的conf中只有properties,但是官方文档讲默认使用log4j2?搞蒙了,究竟用的哪个呢。
> >
>
网上很多人说log4j2是使用.xml配置。但是flink的conf中只有properties,但是官方文档讲默认使用log4j2?搞蒙了,究竟用的哪个呢。
如上,目前发现以前很快(10-30s)内能从敲命名到running的任务。现在有时候innitialize阶段就得1-2min。不清楚啥情况。
分,不可以有flink-sql-orc的包。因为和flink-connector-hive中包冲突?
相当于不同地方使用还不一样。
赵一旦 于2021年1月25日周一 下午1:44写道:
> 基于这个回答,还有另一个英文email中有个人说我的hive-common和hive-exec不一致的问题。
>
> 我分析下来,我本身没依赖任何hive-common、hive-exec。唯一引入可能是flink-sql-connector-hive-1.2.2_2.11_1.12.0中的,我看了pom,其中orc的依赖部分是去除了hive的依赖的。然后我还单独引了一个flin
,反正问题估计是可以解决了我。
解决方式是直接不再依赖flink-sql-orc那个包。因为本身我按照官方文档加了flink-sql-connector-hive的包,同时我分析了这个包内已经shade了orc的包。因为并不需要单独搞一个flink-sql-orc的包。刚刚初步试了下,没问题。还没继续多实验。
赵一旦 于2021年1月25日周一 下午12:59写道:
> 我hive版本应该是1.2.1,我看spark部分依赖的1.2.1的包。
> 此外,关于hive配置。此处我需要问下,flink集群需要有hive的依赖嘛是?我flink集群本身没加任何hive依赖。
&
11:32写道:
> 你好,
>
> 关于分区字段的filter,flink与hive的隐式类型转换规则不同,建议在写where条件时按照分区字段的类型来指定常量。
> 关于读ORC的异常,请问你的hive版本是多少呢?另外hive配置中是否指定过hive.txn.valid.txns参数?
>
> On Sun, Jan 24, 2021 at 6:45 AM 赵一旦 wrote:
>
> > 补充(1)FlinkSQL的查询,对于分区字符串字段貌似必须加'',不加就查询不到?如上hour=02这种直接导致no more split
大佬门帮忙分析下,从日志以及实验过程来看,和ORC有关。
SQL很简单,随意一个select就可以。
异常日志如下:
2021-01-24 04:41:24,952 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler [] -
FATAL: Thread 'flink-akka.actor.default-dispatcher-2' produced an
uncaught exception. Stopping the process...
此外,还有我发现Parquet格式是可以的,顺便看了下FlinkStreamConnector种,FileSink的ParquetBulkFomart。
然后文档讲到ParquetAvroWriters,这种格式写的文件对应hive表怎么创建?貌似默认stored as
parquet的话,不带任何avro的信息呀。
赵一旦 于2021年1月24日周日 上午6:45写道:
> 补充(1)FlinkSQL的查询,对于分区字符串字段貌似必须加'',不加就查询不到?如上hour=02这种直接导致no more split。
> 其次(2)去除这部分问题后,目前可以发现有spli
补充(1)FlinkSQL的查询,对于分区字符串字段貌似必须加'',不加就查询不到?如上hour=02这种直接导致no more split。
其次(2)去除这部分问题后,目前可以发现有split了,但是报了orc相关的错误。并且提交SQL会导致JM直接失败。JM日志如下:
2021-01-24 04:41:24,952 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler [] -
FATAL: Thread 'flink-akka.actor.default-dispatcher-2' produced
SQL很简单,如下:
select * from test2
where `dt`=20210124 and `hour`=02 and `supply_id`=2027
limit 1000;
提交之后很快就finished,并且没有查询到任何数据。
但实际是有数据的,通过spark-sql确认过相同的语句可以查询到数据。
看了JM和TM的日志中有No more splits available。
目前来看貌似一个split也没有。这个应该是1.12的新sourceAPI。
不清楚是不是bug,还是有什么使用注意点呢?
on yarn是另一回事情,yarn自身有自身对资源的衡量方式,就是vcore。
你设置你的作业25并行,默认yarn可能就是按照1个并发对应1个vcore,就这么个意思。肯定有参数可以调整的,你可以让1个并发对应10个vcore都没问题。
就是一种衡量方式而已。
赵一旦 于2021年1月23日周六 下午8:20写道:
> 按照你的理解,别说服务器了,你自己的笔记本电脑都别工作了。你电脑才几核,开机后进程数量就超过CPU数了,这些进程难道不工作了?
>
> 多线程、多线程,不是说N核的就只有N个线程呀。
>
> 你standalone,每个机器想设置多少
按照你的理解,别说服务器了,你自己的笔记本电脑都别工作了。你电脑才几核,开机后进程数量就超过CPU数了,这些进程难道不工作了?
多线程、多线程,不是说N核的就只有N个线程呀。
你standalone,每个机器想设置多少个slot都没问题,和你的机器CPU没关系懂了嘛。“没关系”。
Jacob <17691150...@163.com> 于2021年1月23日周六 下午4:35写道:
> 谢谢回复~
>
> 在我的理解中,并行度数量不应该超过CPU的数量的。
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from:
非常感谢,这个封装真好,直接整个类都可以拿来用了。
Kezhu Wang 于2021年1月23日周六 下午6:00写道:
>
> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java#L259
>
>
> On January 23, 2021 at 13:49:23, 赵一旦 (hinobl...@gmail.com) wrote:
>
> 目前通过自定义OrcB
; }
> join_time = DateUtil.convertTimeStamp2DateStr(minTs,
> DateUtil.SECOND_DATE_FORMAT)
> leave_time = DateUtil.convertTimeStamp2DateStr(maxTs,
> DateUtil.SECOND_DATE_FORMAT)
> duration = (maxTs - minTs) / 1000 //停留多少秒
> duration_time = DateUtil.secondsToFormat
目前通过自定义OrcBulkWriterFactory方式,拿到一个一个的ColumnVector,然后设置值。
对于简单类型,API看起来很清晰,但是Map类型没看懂怎么写。如下,对于serverTime是INT类型,直接设置vector[rowId]即可。那么对于MapColumnVector怎么设置呢,将多个key-value对写进去具体怎么写呢。
serverTimeColumnVector.vector[rowId] = ele.getTimestamp();
MapColumnVector dColumnVector = (MapColumnVector)
此外,写ORC格式文件,对于Map格式的有人知道怎么写的话给个示例吧。
如下,拿到MapColumnVector之后怎么写呢,目前非Map的简单字段都比较清晰,直接设置xxxColumnVector.vector[rowId]的值即可。但是MapColumnVector的API比较乱,没看懂怎么用。
MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2];
赵一旦 于2021年1月23日周六 下午1:42写道:
> 已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。
>
&
已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。
张锴 于2021年1月21日周四 下午7:35写道:
> @赵一旦
> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下
>
> 张锴 于2021年1月21日周四 下午7:13写道:
>
> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
> >
> > 赵一旦 于2021年1月21日周四 下午7:05写道:
> >
> &
已解决。重改写了flink源码覆盖了这部分限制就可以了。
赵一旦 于2021年1月22日周五 上午10:17写道:
> 如题,为什么仅支持recoverableWriter,如果我使用的文件系统不支持怎么办呢,必须自定义sink吗?
>
>
> 我这边用的是一个公司自研的大型分布式文件系统,支持hadoop协议(但不清楚是否所有特性都支持),目前使用streamFileSink和FileSink貌似都无法正常写。
>
> 不清楚是否有其他问题,至少当前是卡住在这个recoverable上了。
>
> 报错是只有hdfs才支持recove
不清楚你为啥需要想这些,集群的并行度你随意设置就好,考虑CPU核数等的地方都只是考虑理想情况的并发。
比如你CPU最高10个核,来20个线程也没办法“并行”,但是可以“并发”。如果你的线程事情很少,10个并发是无法占满10个CPU核的,所以没任何理由因为CPU核的数量去限制你的并发度。
Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道:
> 使用Flink以来,一直有一个问题困扰着。
>
>
> Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。
>
> 比如Flink消费kafka
>
>
如题,为什么仅支持recoverableWriter,如果我使用的文件系统不支持怎么办呢,必须自定义sink吗?
我这边用的是一个公司自研的大型分布式文件系统,支持hadoop协议(但不清楚是否所有特性都支持),目前使用streamFileSink和FileSink貌似都无法正常写。
不清楚是否有其他问题,至少当前是卡住在这个recoverable上了。
报错是只有hdfs才支持recoverableWriter。
有人知道如何解吗?
oin_time, leave_time, created_time, modified_time
> , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime)
>
> }
>
>
> 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?
>
>
>
>
> 赵一旦 于2020年12月28日周一 下午7:12写道:
>
> 按直播间ID和用户
我表达的方法是按照session
window将数据分窗口,实际就是把连续1min没数据的那部分数据给你分割好,然后这部分数据中的最大时间戳和最小时间戳的差就是你要的结果理论上。
实现的话就是用2个状态,分别保存最大最小时间戳,没进来一个数据,对比更新最大最小时间戳即可。
然后在窗口被触发的时候将结果输出。使用reduce+procesWindowFunction配合。reduce的时候只计算最大最小,触发的时候才基于2个状态计算duration输出结果。
赵一旦 于2021年1月21日周四 下午8:28写道:
> 我其实没看懂你逻辑。这个和窗口的最大最小时间戳
erId,
> courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
> join_time, leave_time, created_time, modified_time
> , liveType, plat_form, duration, duration_time,
> network_operator, role, useragent, uid, eventTime)
>
> }
>
>
> 这样写是否合适,如果要用min/max应该如何
根据异常信息以及API源码执行进行分析,确定是否能直接写入。如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API可以参考各个sink端的写法
> 在 2021-01-21 18:45:06,"张锴" 写道:
> >import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
> >DateTimeBucketer}
> >
> >sink.setBucketer sink.s
@Michael Ran
然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
Michael Ran 于2021年1月21日周四 下午5:23写道:
> 这里应该是用了hdfs 的特定API吧,文件系统没兼容public
> HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> 在 2021-01-21 17:18:23,"赵一旦&quo
每个节点,即进程,直接监控进程的cpu,内存就可以。没有更小的粒度。
通信的话看进程的io读写,网络读写等吧。此外flink的rest api可以获取flink web
ui能看到的全部信息,比如节点之间已发送records数量等。
penguin. 于2021年1月18日周一 上午10:55写道:
>
> 那请问对于每个节点的CPU、内存使用率以及节点之间的通信量如何进行实时监控获取数据呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
除此以外,FlinkSQL读现有的hive数据仓库也是失败。配置okhive的catalog,表信息都能出来,但select操作就是失败。
赵一旦 于2021年1月21日周四 下午5:18写道:
> 具体报错信息如下:
>
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop
> are only supported for HDFS
> at org.apache.flink.runtime.fs.hdfs.HadoopRe
)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
赵一旦 于2021年1月21日周四 下午5:17写道:
> Recoverable writers on Hadoop are only supported for HDFS
>
> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
>
> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
>
>
>
Recoverable writers on Hadoop are only supported for HDFS
如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
我将线上的hive-site文件复制到了flink的conf中,并且相关jar都放好。
使用flink的sql-client的-l方式将相关jar指定,启动了sql-client。
然后catalog,databases,tables等信息都能查询到了。
但是select * from xxTable where dt=''; 就会有问题。
看了flink集群报错,这个错误直接导致flink的standalonesession进程失败会。
报错如下:
2021-01-21 13:43:42,818 INFO
key和value都是你自己设置的,看你需要设置什么类型哈。这个不是强制的。
你的map state的key和value在具体业务场景下需要什么类型,那个地方就设置什么类型的TypeInformation,懂吧。
smq <374060...@qq.com> 于2021年1月18日周一 下午12:18写道:
> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: 明启 孙 <374060...@qq.com
> 发送时间: 2021年1月18日 11:30
> 收件人: user-zh 主题:
我看你还写到 “每分钟触发统计一次结果”,你是不是做了自定义trigger啥的,导致逻辑不对了。
默认情况就可以实现你要的效果,不要自定义trigger哈这里。
赵一旦 于2021年1月18日周一 下午3:52写道:
> 补充下,是针对每个key,每1min输出一个结果。采用1h窗口,1min滑动窗口。
>
>
>
> 赵一旦 于2021年1月18日周一 下午3:51写道:
>
>> 你貌似没懂我的意思。我的意思是你的需求和sliding window的效果应该就是一样的。
>> 不是很清楚你表达的最早什么的是什么
补充下,是针对每个key,每1min输出一个结果。采用1h窗口,1min滑动窗口。
赵一旦 于2021年1月18日周一 下午3:51写道:
> 你貌似没懂我的意思。我的意思是你的需求和sliding window的效果应该就是一样的。
> 不是很清楚你表达的最早什么的是什么含义。
>
> 基于event time做sliding window,只要你没设置其他东西,就应该是1min输出一个结果而已哈。
>
> eriendeng 于2021年1月18日周一 上午11:42写道:
>
>> 只要sliding出来才满足你每分钟
你貌似没懂我的意思。我的意思是你的需求和sliding window的效果应该就是一样的。
不是很清楚你表达的最早什么的是什么含义。
基于event time做sliding window,只要你没设置其他东西,就应该是1min输出一个结果而已哈。
eriendeng 于2021年1月18日周一 上午11:42写道:
> 只要sliding出来才满足你每分钟执行的滑动要求吧?至于你只要最早/新的某一条,你可以从window groupby得出的流再次group
> by然后再用window时间筛选你要的数据。
>
>
>
> --
> Sent from:
从你的描述来看,你说的貌似就是sliding window呀。
9-10,9.01-10.01...
marble.zh...@coinflex.com.INVALID
于2021年1月15日周五 下午5:45写道:
> 大家好, 我现在的场景需求是,窗口size是1个小时,每分钟触发统计一次结果。比如现在是10点钟,则统计9点到10点的结果。
> 下一分钟则在10:01分时触发统计9:01到10:01的结果。
>
> 如果用Sliding window, 比如.timeWindow(Time.hours(1L), Time.minutes(1)),
>
slot好像只是逻辑概念,监控意义不大,没有资源隔离。
penguin. 于2021年1月15日周五 下午5:06写道:
> Hi,
> flink集群中,能对TaskManager的每个TaskSlot进行监控吗?比如每个slot的cpu和内存使用率之类的指标。
>
>
> penguin
Hi,问题解决了。不清楚你为啥需要启动hiveServer2呢?貌似不需要。flink只需要用到hms吧。
RS 于2020年10月30日周五 上午9:57写道:
> Hi,
>
> 谢谢,应该是HMS的问题, 原来是需要配置remote的HMS,之前都是local模式
> 我执行了一下流程:
> 1. 清理了旧的数据库和数据目录
> 2. 重新初始化 schematool -dbType mysql -initSchema
> 3. 启动hive --service metastore, 成功监听端口9083端口
> 4. 启动hiveserver2,
Evan说的这个是一个设置,但也仅影响cancel那个命令,stop还是会删除。这个点其实做的不是很好,不清楚为啥,之前Q过,没人鸟。。。
所以按照我的经验,如果是需要停止并基于保存点重启,那还好。如果计划基于检查点重启,无比提前备份检查点,然后停任务,然后复制备份回去。
在或者,直接cancel,不用stop。
Evan 于2021年1月14日周四 下午6:49写道:
> 是的,应该是机制问题,链接[1]打开有这样一句解释:
>
> If you choose to retain externalized checkpoints on cancellation you have
机制就是这样的。如下是我之前做过的测试。
启动后等待若干检查点之后做如下操作文件系统上的检查点是否保留说明
WEB UI 点击 Cancel 方式取消任务 保留 合理,因为设置了 RETAIN_ON_CANCELLATION。
通过命令生成保存点:flink savepoint ${jobId} ${savepointDir} 保留 OK
通过命令取消任务:flink cancel ${jobId} 保留 OK
通过命令取消任务并生成保存点:flink cancel -s ${savepointDir} ${jobId} 保留 OK
通过命令停止任务(基于默认保存点目录):flink
s. If Flink fails because messages exceed
> this limit, then you should increase it. The message size requires a
> size-unit specifier.
>
> 参考:
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
>
>
>
>
>
> 发件人: 赵一旦
> 发送时
报错500,开发者工具展示的异常信息如下。
"
想知道这个是什么情况,以及需要调整什么参数呢?
个人观点:
这个应该不可以,你提交的任务最终实际是打包给tm去执行的,使用的是tm的日志配置,而不是你自己的配置。
你自己那个配置仅仅用于本地调试启动的时候有效。
nicygan 于2021年1月13日周三 上午9:55写道:
> dear all:
> 我的flink任务提交到yarn运行,
> 默认生效的是日志配置是flink/conf中的log4j.properties。
> 但我的应用jar包中还有一个log4j2.xml,这里面配置了KafkaAppend,要把日志发送到kafka。
> 我要如果设置,才能让这两个配置文件都生效呢?
描述还是不清晰。
watermark是定期生成,你获取的时候不一定已经更新。
何宗谨 于2021年1月13日周三 上午10:20写道:
>
> 允许的时间间隔是3秒,每次打印的都是上一个时间戳的watermark,但是使用的好像还是这次的
>
> -- 原始邮件 --
> *发件人:* "user-zh" ;
> *发送时间:* 2021年1月13日(星期三) 上午10:02
> *收件人:* "user-zh";
> *主题:* Re:
图挂了。
何宗谨 于2021年1月13日周三 上午9:20写道:
>
>
>
>
>
设置下镜像可以。
Yun Tang 于2021年1月12日周二 下午5:37写道:
> Hi,
>
> 国内网络环境不太好,其实问题是node.js 安装有问题,可以考虑单独安装一下node
> js和npm,如果还是不行,在不需要webui的前提下,可以编译时候加上profile “-Pskip-webui-build”
> 来skip掉该部分的编译。
>
> 祝好
> 唐云
>
> From: Ruguo Yu
> Sent: Tuesday, January 12, 2021 14:00
> To:
使用Flink的rest
api不可以嘛。我是standalone集群,写个python脚本,写了个list为expected_jobs,如果发现集群没这个job就报警。
Yun Tang 于2021年1月8日周五 上午10:53写道:
> 因为numRestarts 是一个累计值,所以你得区分当前值和之前的数值是否发生了增加,来区分是否发生了failover。
>
>
>
检查点无错误,但检查点配置的后端sdk有报错,所以不清楚这个错误究竟有没有影响。下面是报错堆栈,帮忙分析下这个是写检查点数据的过程吗?如果是的话,404是什么意思。找不到?找不到啥。。。
com.baidubce.BceServiceException: Not Found (Status Code: 404; Error Code:
null; Request ID: 624d3468-8d7b-46f7-be5d-750c9039893d)
at
报错信息如下:
java.lang.IllegalArgumentException: Can not set long field
com.xxx.buzz.pojo.AbstractDrRecord.timestamp to null value
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
UnsafeFieldAccessorImpl.java:167)
at
没有的。
penguin. 于2021年1月7日周四 下午1:04写道:
> 赵一旦:
> 所以目前是否有办法来实现在提交任务后,将这个任务的subtask调度到指定机器的某个slot来执行呢。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-07 12:57:35,"赵一旦" 写道:
> >不一样的哈。不是一个层次的东西。
> >调度平台指的是在指定时间自动帮你提交某个任务,或
不一样的哈。不是一个层次的东西。
调度平台指的是在指定时间自动帮你提交某个任务,或者每天定时提交某个任务等。
后者是flink内部的机制,指提交任务后,这个任务的每个subtask应该使用哪台机器哪个slot去执行。
penguin. 于2021年1月7日周四 下午12:50写道:
> 赵一旦:
> 你说的任务调度平台是指通过这种平台来完全控制flink中的task到具体某个节点的调度吗?
> 我想的是flink自己内部的task到节点的调度。比如说通过修改flink现在的调度部分的代码来实现。
> 是不是这两种都可以用来实现 根据我们自己的需求来决定将t
你说的是任务调度有2层含义。一种任务调度平台(这个很常见)。还是flink自身的task的schedule,这个是很复杂。
penguin. 于2021年1月7日周四 上午10:32写道:
>
>
>
> 我在知网的一篇论文中看到有作者做的flink任务调度,但是发了邮件很久也没人回复。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-07 10:21:27,"赵一旦" 写道:
> >是的,之前有人给过
是的,之前有人给过我回复,说当前flink的调度信息不足,导致无法做到很理想的调度。
penguin. 于2021年1月7日周四 上午10:11写道:
>
> 我在jira上看到好像有人在做,但是好像无法获取到更多的信息。也不知道他们是怎么做的。主要应该是找到进行任务调度那块的代码,不过源码注释好像很少,很困难。
>
>
>
>
>
>
>
>
>
> 在 2021-01-06 13:06:20,"赵一旦" 写道:
> >我不是很清楚,不过难度应该很大,不然社区早改了。当前任务经
我比较倾向于是网络原因。但flink的日志目前无法很明显反映和确认。期望有人从flink反压机制角度考虑下,有没有因为网络“抖动”,比如长连接断开等问题导致反压的case。而且这种情况是否会自动恢复呢?从我的几次经验来看我不重启就不恢复。。。
赵一旦 于2021年1月6日周三 下午11:43写道:
> 如题,反压的原因,不考虑计算压力大,并行度不合理等问题。
> 比如是否可能和网络也有关呢?
>
> 考虑如下case,A->B->C这么一个拓扑,我A(source)结点反压100%,数据彻底不再发送,但B和C都不反压。但是B、C都是非常简单(不可
如题,反压的原因,不考虑计算压力大,并行度不合理等问题。
比如是否可能和网络也有关呢?
考虑如下case,A->B->C这么一个拓扑,我A(source)结点反压100%,数据彻底不再发送,但B和C都不反压。但是B、C都是非常简单(不可能存在性能问题)。那这还有什么解释吗?
比如,A和B之间网络是否可能出问题呢?
此外,从机器cpu等监控来看,出现反压后,cpu
idle提升,即反压到cpu利用率直接降低,且cpu在附近实际无升高的迹象。因此不会是瞬间有压力来导致反压。
我当前怀疑和网络有关,有人知道如何确认吗。这种case是否有可能自动恢复呢。
可以看下文档去,配置忽略解析错误。
air23 于2021年1月6日周三 上午10:41写道:
> 你好 这边使用flink sql有如下问题;
>
>
>
>
>
>
> CREATE TABLE source1 (
> id BIGINT ,
> username STRING ,
> password STRING ,
> AddTime TIMESTAMP ,
> origin_table STRING METADATA FROM 'value.table' VIRTUAL
> ) WITH (
> 'connector' =
at [Source:UNKONWN; line: -1, column:
> -1;]
>
> 这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。
>
> 但是如果遇到了kafka消息为空的情况,这边不能处理吗?
>
> 赵一旦 于2021年1月5日周二 下午9:18写道:
>
> > 我感觉还是jar的问题。如下尝试下,我懒得去试了。
> > 将 org.a
我不是很清楚,不过难度应该很大,不然社区早改了。当前任务经常导致机器资源不均衡,这个问题很常见。
penguin. 于2021年1月6日周三 上午11:15写道:
> Hi,请问大家知道怎么更改flink默认的任务调度方式吗?
那为什么没有日志呢,去机器看日志呗。
于2021年1月6日周三 上午10:11写道:
> 应该是状态大,超时设了10分钟,还没有达到超时时间。到处找不到相关日志。
>
> 发自我的iPhone
>
> > 在 2021年1月6日,10:03,赵一旦 写道:
> >
> > 没日志咋排查,为啥失败总得说下。超时的话可能就是任务压力大,状态大等。
> >
> > 于2021年1月6日周三 上午9:53写道:
> >
> >> flink 1.11.2环境下,三个kafk
你这个方法就可以的哈,至于第二个窗口又聚到一个结点的问题本身就是原始问题,基于你的方法缓解即可,第二层不可避免的。
你需要做的是调整合理的参数,使得第二层的数据虽然不均衡,但数据量以及足够低就可以了。
此外,还需要注意,当前key数量假设1w,加10随机就是10w,加100随机就是100w。这个key的膨胀也很严重的。最好的做法是仅针对高数据量的key分拆。
syumialiu 于2021年1月5日周二 下午11:53写道:
>
>
没日志咋排查,为啥失败总得说下。超时的话可能就是任务压力大,状态大等。
于2021年1月6日周三 上午9:53写道:
> flink 1.11.2环境下,三个kafka topic 的数据进行join,出现checkpoint失败,没有日志,请问怎么排查?
>
> 发自我的iPhone
我感觉还是jar的问题。如下尝试下,我懒得去试了。
将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为
org.apache.flink.kafka.shaded.org.apache.kafka.common.securi
ty.plain.PlainLoginModule
因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。
Carmen Free 于2021年1月5日周二 下午5:09写道:
> flink sql
可以看看是否反压。反压说明并行度还是不够,不反压的话看处理速度是否符合预期。符合预期就不用调了,说明你的任务不复杂,那点cpu占用就够了。如果不符合预期,也没有任何反压,那就是source消费速度太慢。
housezhang 于2021年1月5日周二 下午5:44写道:
> 有可能是cpu能够处理得过来,网络io处理不过来了,看看网络使用情况
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
这个问题en...出在如下地方:
KeyedStream keyByStream =
signoutTimeAndWM.keyBy(new KeySelector() {
@Override
public String getKey(ShareRealTimeData value) throws Exception {
return DateUtilMinutes.timeStampToDate(new
Date().getTime()); // 此处,不可以使用new
不纠结几核。如果任务结点本身不多的话,可以提一提再,只要network buffer数量够就好。
爱吃鱼 于2021年1月5日周二 下午4:39写道:
> 24核的机器,已经加到了24的并行度了,然后会 24个并行度的cpu利用率加起来也是140%左右
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-05 16:24:30,"赵一旦" 写道:
> >加大并行度。
> >
> &
加大并行度。
爱吃鱼 于2021年1月5日周二 下午4:18写道:
> 怎么提高flink cpu利用率。
> 业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
> flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
> cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。
当前vertex的结点监控,有个获取全部指标的接口,和基于get参数逗号分割获取指标值的接口。
现在问题是我的采集脚本在获取监控值时候,因为是get导致超长,于是我5个5个的获取,但这导致我每30s一次采集,每次采集上百次请求,耗时达到几十秒。
是否可以搞个post接口;或者在metrics那个获取全部metric指标id的接口中就直接返回全部value呢?
ll掉job,第二天1:30重启,结果还是在昨天的结果上累加的,执行的kill-job好像并不能释放state,这个真的一点办法都没有了吗?
>
>
>
> --原始邮件------
> 发件人: "赵一旦" 发送时间: 2020年12月29日(星期二) 晚上9:35
> 收件人: "user-zh" 主题: Re: flink 1.12 Cancel Job内存未释放(问)
>
>
>
> 不可以吧。任务是任务。taskManager
1 kafka table和group id是啥意思。group id随意写一个就可以了。
2 本身就可以复用。
3 听不懂表达啥。
HideOnBushKi <1405977...@qq.com> 于2021年1月4日周一 下午3:43写道:
> 大佬们好,现在业务需求多,生产作业开始变得繁重,请教3个生产业务场景,主要是想看下各位有什么优雅的思路
>
> 1.kakfa table和group_ID应该怎么去维护呢?一个业务一张表吗?
> 2.如何做到复用表的效果?
> 3.如果新增一个业务需求,用到之前的一张kafka table,似乎要在一个程序里。执行多次
这个问题基本分析应该没啥问题,发出来给大家参考借鉴。
赵一旦 于2020年12月31日周四 下午1:01写道:
> 目的呢如题:先基于process_time预聚合,最后基于event_time聚合。
>
> 预聚合使用10s窗口,最终聚合使用5min窗口,且使用10s的continuous trigger。
>
>
> 同时,为了避免2个5分钟窗口的数据在窗口临界位置时候,被10s的预聚合到一起(错误case),我在预聚合的时候使用的key中多加了个字段(time),time是格式化到5分钟的结尾时间的time。因此这个问题可以忽略。
>
目的呢如题:先基于process_time预聚合,最后基于event_time聚合。
预聚合使用10s窗口,最终聚合使用5min窗口,且使用10s的continuous trigger。
同时,为了避免2个5分钟窗口的数据在窗口临界位置时候,被10s的预聚合到一起(错误case),我在预聚合的时候使用的key中多加了个字段(time),time是格式化到5分钟的结尾时间的time。因此这个问题可以忽略。
但是呢,目前发现一个更大的问题。最终窗口输出的key+time的pv存在变小的情况。刚开始很奇怪,想了很久。然后分析出一些问题。
实际key+time1=2:
不可以吧。任务是任务。taskManager是taskManager。 taskManager是提前启动好的一个进程,任务提交的时候会由
taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
或者考虑yarn方式,per-job模式啥的。
徐州州 <25977...@qq.com> 于2020年12月29日周二 上午9:00写道:
> 请教一下,我flink
>
按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。
或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。
session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。
张锴 于2020年12月28日周一 下午5:35写道:
> 能描述一下用session window的考虑吗
>
> Akisaya 于2020年12月28日周一 下午5:00写道:
>
>
是否有必要将并行度设置为128的约数我意思是。
Shengkai Fang 于2020年12月28日周一 上午10:38写道:
> hi, 如果热点是某个key的数据量较大造成的,那么re-partition依旧无法解决这个问题。
> 个人认为最好的解决办法是基于window的 mini-batch 以及 local-global agg,社区正在解这类问题,可以关注下[1]
>
> [1]https://issues.apache.org/jira/browse/FLINK-19604
>
> 赵一旦 于2020年12月28日周一 上午10:3
aster/dev/table/config.html#table-exec-resource-default-parallelism
>
> 赵一旦 于2020年12月27日周日 下午12:44写道:
>
> > 了解下多少数据量呀,128的并发其实很高了感觉。
> >
> > guaishushu1...@163.com 于2020年12月26日周六 下午5:39写道:
> >
> > > Flink
> > >
> >
> SQL中Source和sink可以通过修改c
首先,要保重在保留数量范围内。
其次,你的任务不能是stop的任务,flink会在任务stop的时候删除所有检查点。
cancel的时候不会删除。
Yun Tang 于2020年12月27日周日 下午5:55写道:
> Hi
>
> 既然UI上已经显示成功了,一定是成功且成功保存到HDFS上了,可以看下父目录的情况,chk-x 目录可能随着新的checkpoint完成而被删除
>
> 祝好
> 唐云
>
> From: chen310 <1...@163.com>
> Sent: Friday,
了解下多少数据量呀,128的并发其实很高了感觉。
guaishushu1...@163.com 于2020年12月26日周六 下午5:39写道:
> Flink
> SQL中Source和sink可以通过修改connector配置实现并发度配置,而其他算子的并发度都是根据Source并发度来设置的,这样最多是128个并发度。但是有些算子做聚合等处理,128并发明显不够这个应该怎么解决呢?支持通过配置设置其他算子并发度吗?
>
>
>
> guaishushu1...@163.com
>
如下
Exception in thread "main" java.lang.UnsupportedOperationException:
Checkpointing is currently not supported by default for iterative jobs, as
we cannot guarantee exactly once semantics. State checkpoints happen
normally, but records in-transit during the snapshot will be lost upon
failure.
The
如题,有人知道关键词吗,每次失败日志太多哦。
显示各种task的cancel等。
最后突然就失败了。。。
目前感觉经常是因为cancel(180s)。导致Task did not exit gracefully within 180 + seconds。
此外,大家生产中会修改日志格式和日志文件吗。我调整了之后WEB-UI上那个日志从来没能看过。现在虽然有个日志list,但点了也没效果。
我调整了日志文件名。
报错信息看下:Caused by:
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.NumberFormatException: Not a version: 9。
bigdata <1194803...@qq.com> 于2020年12月24日周四 下午9:49写道:
> flink1.10.1集群dml报错如下
>
I don't believe what you say...
nicygan 于2020年12月24日周四 下午7:25写道:
> dear all:
> 我在checkpoint设置中,设置了
>
> checkpointConfig.setMinPauseBetweenCheckpoints(180_000L)
> 但是好像并没有生效,
> 比如id=238的结束时间为17:13:30
> 但是id=239的开始时间也是17:13:30
>
>
> 我的理解id=239的开始时间至少应该是17:16:30
>
>
如果我设置了并行度为130,那么最大并行度会自动提升吗? -- 应该是的,否则不可能有数据收到。
但我没看到源码在哪调整的最大并行度。
没有大神懂这个的吗?帮忙分析下。
赵一旦 于2020年12月22日周二 上午12:05写道:
> 目前来说,按照我讲的方式去实现应该不难。我怕的是flink在恢复keyedState的时候,无法适应我的这种partition机制。
>
> 现有的机制,restore的时候实际是 keyGroup 到window并行实例之间的一个重分配。
>
> 换成我的partition机制后,能否还正常restore呢?
>
> 赵一旦 于2020年12月22日周二 上午12:03写道:
>
>> 如题,目前
共有 281 项搜索結果,以下是第 1 - 100 matches
Mail list logo