?????? ??????????????????????flink state

2021-01-22 文章 ??????
TTL??keybykey1state?? ---- ??: ""https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows news_...@163.com

求一个简单的示例,flink写orc格式文件,对于Map复杂类型的写法。

2021-01-22 文章 赵一旦
目前通过自定义OrcBulkWriterFactory方式,拿到一个一个的ColumnVector,然后设置值。 对于简单类型,API看起来很清晰,但是Map类型没看懂怎么写。如下,对于serverTime是INT类型,直接设置vector[rowId]即可。那么对于MapColumnVector怎么设置呢,将多个key-value对写进去具体怎么写呢。 serverTimeColumnVector.vector[rowId] = ele.getTimestamp(); MapColumnVector dColumnVector = (MapColumnVector)

Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-22 文章 赵一旦
此外,写ORC格式文件,对于Map格式的有人知道怎么写的话给个示例吧。 如下,拿到MapColumnVector之后怎么写呢,目前非Map的简单字段都比较清晰,直接设置xxxColumnVector.vector[rowId]的值即可。但是MapColumnVector的API比较乱,没看懂怎么用。 MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2]; 赵一旦 于2021年1月23日周六 下午1:42写道: > 已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。 > > 张锴

Re: Re: Re: Flink写hdfs提交任务就报错。Recoverable writers on Hadoop are only supported for HDFS

2021-01-22 文章 赵一旦
已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。 张锴 于2021年1月21日周四 下午7:35写道: > @赵一旦 > 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下 > > 张锴 于2021年1月21日周四 下午7:13写道: > > > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的 > > > > 赵一旦 于2021年1月21日周四 下午7:05写道: > > > >> @Michael Ran; 嗯嗯,没关系。 > >> > >> @张锴

Re: Flink的StreamFileSink和1.12提供的FileSink中,BucketsBuilder的createBucketWriter中仅支持recoverableWriter。

2021-01-22 文章 赵一旦
已解决。重改写了flink源码覆盖了这部分限制就可以了。 赵一旦 于2021年1月22日周五 上午10:17写道: > 如题,为什么仅支持recoverableWriter,如果我使用的文件系统不支持怎么办呢,必须自定义sink吗? > > > 我这边用的是一个公司自研的大型分布式文件系统,支持hadoop协议(但不清楚是否所有特性都支持),目前使用streamFileSink和FileSink貌似都无法正常写。 > > 不清楚是否有其他问题,至少当前是卡住在这个recoverable上了。 > > 报错是只有hdfs才支持recoverableWriter。 > >

?????? ????????????Orders????????????????join????????????????????

2021-01-22 文章 ??????
=_= ??thank you?? ---- ??: "yang nick"

Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。

2021-01-22 文章 yang nick
我感觉你这个用实时很难做,涉及到状态更新的无限流,需要配置 state ttl 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:23写道: > > 我遇到的难题是,拒收订单想拿到payAment字段必须扫描全量的order_money表。order_money是下单时候才会产生,我拒收订单根本不知道它的下单时间根本不知道怎么拿,而且order_money没有任何标记,我全量扫描money表程序OOM。我的数据是通过Canal监控过来的,我需要写flink-sql来进行join。 > > > > >

Re: flink-sql-gateway支持远程吗

2021-01-22 文章 yang nick
可以试试zeppelin 罗显宴 <15927482...@163.com> 于2021年1月23日周六 上午11:19写道: > > 大佬们,我试过flinksql-client和flink-sql-gateway发现都是把代码提交到本地集群运行,好像不能提交到指定远程集群上执行任务,请问大佬们,怎么提交代码到远程集群运行 > > > | | > 15927482803 > | > | > 邮箱:15927482...@163.com > | > > 签名由 网易邮箱大师 定制

Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。

2021-01-22 文章 yang nick
我觉的既然是做数仓,这里没啥好纠结的,数据已经在数仓里面了,做成宽表就行了 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:11写道: > > 我起初也有这个想法,但是他们业务告诉我,order_reject表是单独存在的。拒收单子只会在order_reject出现,而order_money只在它下单的时候写入,拒收订单数据过来order_money不会有任何变化,所以我即使拿到每天的拒收订单去join也是要读取全量order_money。 > > > > > --原始邮件-- >

?????? ????????????Orders????????????????join????????????????????

2021-01-22 文章 ??????
order_reject??order_rejectorder_money??order_money??join??order_money?? ---- ??:

回复:请教关于Flink yarnship的使用

2021-01-22 文章 叶贤勋
URL url = this.getClass().getClassLoader().getResource("conf”); String dir = url.getFile(); dir目录下应该会包含ship的配置文件,你可以试下。 在2021年01月22日 15:38,Yan Tang 写道: 我把配置和jar包分开了,用-yt option将配置文件Ship到yarn cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么? 我的提交命令: -yt /path/to/conf code:

Re: 请教关于Flink yarnship的使用

2021-01-22 文章 silence
你可以尝试同时指定-C "file:///path/to/conf/cmp_online.cfg" 以及 -yt /path/to/conf 来进行测试 然后代码里这么获取this.getClass().getResourceAsStream("cmp_online.cfg") -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 请教关于Flink yarnship的使用

2021-01-22 文章 Yan Tang
如果-yt 不适用我这种场景,真不知道这个option是做什么的了。在spark中我用的就是--files,可以达到我想要的效果。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 根据业务需求选择合适的flink state

2021-01-22 文章 张锴
@赵一旦 可以添加一下微信好友吗,具体的实践上还有点问题,我是在window后直接reduce(new myReduceFunc(),new AssignWindowProcessFunc())自定义了这两个方法,但是效果还是有点问题,不知道我的写法是不是有问题 赵一旦 于2021年1月22日周五 上午10:10写道: > 我理解你要的最终mysql结果表是: > 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间); > > 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。 > > >

退订

2021-01-22 文章 Natasha
退订

Re: 请教关于Flink yarnship的使用

2021-01-22 文章 yang nick
这个方法应该是读取本地的文件,但是你放到yarn中执行,就会找不到这个文件。所以建议可以把配置上传到hdfs中试试看 Yan Tang 于2021年1月22日周五 下午4:53写道: > 我把配置和jar包分开了,用-yt option将配置文件Ship到yarn > cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么? > 我的提交命令: > -yt /path/to/conf > > code: > this.getClass().getResourceAsStream("conf/cmp_online.cfg") > 但一直返回null. > > > >

Re: Flink 并行度问题

2021-01-22 文章 gimlee
并行度和CPU的核数没啥关系。 设置slot数量也不代表使用多少个CPU。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink 并行度问题

2021-01-22 文章 yang nick
如果是 standalone的模式部署在一台机器上,那么据我了解,只会有一个TM,一个TM可以有多个slot Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道: > 使用Flink以来,一直有一个问题困扰着。 > > > Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。 > > 比如Flink消费kafka > >

请教关于Flink yarnship的使用

2021-01-22 文章 Yan Tang
我把配置和jar包分开了,用-yt option将配置文件Ship到yarn cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么? 我的提交命令: -yt /path/to/conf code: this.getClass().getResourceAsStream("conf/cmp_online.cfg") 但一直返回null. -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Flink 并行度问题

2021-01-22 文章 Ye Chen
@jacob hi, TaskManager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,并且 Slot 只对内存隔离,没有对 CPU 隔离。 而slot 和并行度的关系是:Slot 是指 TaskManager 最大能并发执行的能力,parallelism 是指 TaskManager 实际使用的并发能力。 个人见解,并行度的设置一般无需考虑CPU。 在 2021-01-22 16:18:32,"Jacob" <17691150...@163.com> 写道: >使用Flink以来,一直有一个问题困扰着。 > > >Flink

Re: Flink 并行度问题

2021-01-22 文章 赵一旦
不清楚你为啥需要想这些,集群的并行度你随意设置就好,考虑CPU核数等的地方都只是考虑理想情况的并发。 比如你CPU最高10个核,来20个线程也没办法“并行”,但是可以“并发”。如果你的线程事情很少,10个并发是无法占满10个CPU核的,所以没任何理由因为CPU核的数量去限制你的并发度。 Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道: > 使用Flink以来,一直有一个问题困扰着。 > > > Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。 > > 比如Flink消费kafka > >

Re: Pyflink JVM Metaspace 内存泄漏定位

2021-01-22 文章 YueKun
关闭问题,已经解决,解决方法是不通过 pipeline.jars 的方式跟随python任务动态提交jar包,改为放在 FLINK_HOME/lib 下 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Exception in thread "main" java.lang.RuntimeException: Unknown call expression: avg(amount)

2021-01-22 文章 Appleyuchi
我在驗證 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html 中的 "Distinct aggregation on over window"(請在上述鏈接內,Ctrl+f搜索該雙引號內的整個字符串) 測試代碼: distinctaggregation3.java https://paste.ubuntu.com/p/7HJ9W3hVVN/ 測試用的POJO: OrderStream.java

Flink 并行度问题

2021-01-22 文章 Jacob
使用Flink以来,一直有一个问题困扰着。 Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。 比如Flink消费kafka topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。 如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ?

flink-Kafka 报错:ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

2021-01-22 文章 lp
测试代码如下: -- public class Sink_KafkaSink_1{ public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties")); String host =

Re: flink sql 执行limit 很少的语句依然会暴增

2021-01-22 文章 zhang hao
嗯嗯 好的 谢谢大家 ,应该就是这个问题了,merge到分支验证下 On Fri, Jan 22, 2021 at 11:35 AM Shengkai Fang wrote: > hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。 > > [1] https://github.com/apache/flink/pull/13800 > > Land 于2021年1月22日周五 上午11:28写道: > > > 可能是没有下推到MySQL执行。 > > 问题和我遇到的类似: > >