TTL??keybykey1state??
----
??: ""https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
news_...@163.com
目前通过自定义OrcBulkWriterFactory方式,拿到一个一个的ColumnVector,然后设置值。
对于简单类型,API看起来很清晰,但是Map类型没看懂怎么写。如下,对于serverTime是INT类型,直接设置vector[rowId]即可。那么对于MapColumnVector怎么设置呢,将多个key-value对写进去具体怎么写呢。
serverTimeColumnVector.vector[rowId] = ele.getTimestamp();
MapColumnVector dColumnVector = (MapColumnVector)
此外,写ORC格式文件,对于Map格式的有人知道怎么写的话给个示例吧。
如下,拿到MapColumnVector之后怎么写呢,目前非Map的简单字段都比较清晰,直接设置xxxColumnVector.vector[rowId]的值即可。但是MapColumnVector的API比较乱,没看懂怎么用。
MapColumnVector dColumnVector = (MapColumnVector) batch.cols[2];
赵一旦 于2021年1月23日周六 下午1:42写道:
> 已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。
>
> 张锴
已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。
张锴 于2021年1月21日周四 下午7:35写道:
> @赵一旦
> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下
>
> 张锴 于2021年1月21日周四 下午7:13写道:
>
> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
> >
> > 赵一旦 于2021年1月21日周四 下午7:05写道:
> >
> >> @Michael Ran; 嗯嗯,没关系。
> >>
> >> @张锴
已解决。重改写了flink源码覆盖了这部分限制就可以了。
赵一旦 于2021年1月22日周五 上午10:17写道:
> 如题,为什么仅支持recoverableWriter,如果我使用的文件系统不支持怎么办呢,必须自定义sink吗?
>
>
> 我这边用的是一个公司自研的大型分布式文件系统,支持hadoop协议(但不清楚是否所有特性都支持),目前使用streamFileSink和FileSink貌似都无法正常写。
>
> 不清楚是否有其他问题,至少当前是卡住在这个recoverable上了。
>
> 报错是只有hdfs才支持recoverableWriter。
>
>
=_= ??thank you??
----
??: "yang nick"
我感觉你这个用实时很难做,涉及到状态更新的无限流,需要配置 state ttl
徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:23写道:
>
> 我遇到的难题是,拒收订单想拿到payAment字段必须扫描全量的order_money表。order_money是下单时候才会产生,我拒收订单根本不知道它的下单时间根本不知道怎么拿,而且order_money没有任何标记,我全量扫描money表程序OOM。我的数据是通过Canal监控过来的,我需要写flink-sql来进行join。
>
>
>
>
>
可以试试zeppelin
罗显宴 <15927482...@163.com> 于2021年1月23日周六 上午11:19写道:
>
> 大佬们,我试过flinksql-client和flink-sql-gateway发现都是把代码提交到本地集群运行,好像不能提交到指定远程集群上执行任务,请问大佬们,怎么提交代码到远程集群运行
>
>
> | |
> 15927482803
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
我觉的既然是做数仓,这里没啥好纠结的,数据已经在数仓里面了,做成宽表就行了
徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:11写道:
>
> 我起初也有这个想法,但是他们业务告诉我,order_reject表是单独存在的。拒收单子只会在order_reject出现,而order_money只在它下单的时候写入,拒收订单数据过来order_money不会有任何变化,所以我即使拿到每天的拒收订单去join也是要读取全量order_money。
>
>
>
>
> --原始邮件--
>
order_reject??order_rejectorder_money??order_money??join??order_money??
----
??:
URL url = this.getClass().getClassLoader().getResource("conf”);
String dir = url.getFile();
dir目录下应该会包含ship的配置文件,你可以试下。
在2021年01月22日 15:38,Yan Tang 写道:
我把配置和jar包分开了,用-yt option将配置文件Ship到yarn cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么?
我的提交命令:
-yt /path/to/conf
code:
你可以尝试同时指定-C "file:///path/to/conf/cmp_online.cfg" 以及 -yt /path/to/conf 来进行测试
然后代码里这么获取this.getClass().getResourceAsStream("cmp_online.cfg")
--
Sent from: http://apache-flink.147419.n8.nabble.com/
如果-yt 不适用我这种场景,真不知道这个option是做什么的了。在spark中我用的就是--files,可以达到我想要的效果。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
@赵一旦
可以添加一下微信好友吗,具体的实践上还有点问题,我是在window后直接reduce(new myReduceFunc(),new
AssignWindowProcessFunc())自定义了这两个方法,但是效果还是有点问题,不知道我的写法是不是有问题
赵一旦 于2021年1月22日周五 上午10:10写道:
> 我理解你要的最终mysql结果表是:
> 直播间ID;用户ID;上线时间;下线时间;durationn=(下线时间 - 上线时间);
>
> 如果user1在直播间1,一天内出现10次,就出现10个记录,分别记录了每次的duration。
>
>
>
退订
这个方法应该是读取本地的文件,但是你放到yarn中执行,就会找不到这个文件。所以建议可以把配置上传到hdfs中试试看
Yan Tang 于2021年1月22日周五 下午4:53写道:
> 我把配置和jar包分开了,用-yt option将配置文件Ship到yarn
> cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么?
> 我的提交命令:
> -yt /path/to/conf
>
> code:
> this.getClass().getResourceAsStream("conf/cmp_online.cfg")
> 但一直返回null.
>
>
>
>
并行度和CPU的核数没啥关系。
设置slot数量也不代表使用多少个CPU。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
如果是 standalone的模式部署在一台机器上,那么据我了解,只会有一个TM,一个TM可以有多个slot
Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道:
> 使用Flink以来,一直有一个问题困扰着。
>
>
> Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。
>
> 比如Flink消费kafka
>
>
我把配置和jar包分开了,用-yt option将配置文件Ship到yarn cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么?
我的提交命令:
-yt /path/to/conf
code:
this.getClass().getResourceAsStream("conf/cmp_online.cfg")
但一直返回null.
--
Sent from: http://apache-flink.147419.n8.nabble.com/
@jacob
hi, TaskManager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,并且 Slot 只对内存隔离,没有对 CPU 隔离。
而slot 和并行度的关系是:Slot 是指 TaskManager 最大能并发执行的能力,parallelism 是指 TaskManager
实际使用的并发能力。
个人见解,并行度的设置一般无需考虑CPU。
在 2021-01-22 16:18:32,"Jacob" <17691150...@163.com> 写道:
>使用Flink以来,一直有一个问题困扰着。
>
>
>Flink
不清楚你为啥需要想这些,集群的并行度你随意设置就好,考虑CPU核数等的地方都只是考虑理想情况的并发。
比如你CPU最高10个核,来20个线程也没办法“并行”,但是可以“并发”。如果你的线程事情很少,10个并发是无法占满10个CPU核的,所以没任何理由因为CPU核的数量去限制你的并发度。
Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道:
> 使用Flink以来,一直有一个问题困扰着。
>
>
> Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。
>
> 比如Flink消费kafka
>
>
关闭问题,已经解决,解决方法是不通过 pipeline.jars 的方式跟随python任务动态提交jar包,改为放在 FLINK_HOME/lib 下
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我在驗證
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html
中的
"Distinct aggregation on over window"(請在上述鏈接內,Ctrl+f搜索該雙引號內的整個字符串)
測試代碼:
distinctaggregation3.java
https://paste.ubuntu.com/p/7HJ9W3hVVN/
測試用的POJO:
OrderStream.java
使用Flink以来,一直有一个问题困扰着。
Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。
比如Flink消费kafka
topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。
如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ?
测试代码如下:
--
public class Sink_KafkaSink_1{
public static void main(String[] args) throws Exception {
final ParameterTool params =
ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties"));
String host =
嗯嗯 好的 谢谢大家 ,应该就是这个问题了,merge到分支验证下
On Fri, Jan 22, 2021 at 11:35 AM Shengkai Fang wrote:
> hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。
>
> [1] https://github.com/apache/flink/pull/13800
>
> Land 于2021年1月22日周五 上午11:28写道:
>
> > 可能是没有下推到MySQL执行。
> > 问题和我遇到的类似:
> >
26 matches
Mail list logo