Flink 1.12 Blink planner timestamp类型转换异常
Hi, all flink1.12 Blink planner有人遇到过这样的问题么: 下面是简化的逻辑 DataStream ds = .map(xxxRichMapFunction); Table table = tableEnv.fromDataStream(ds); tableEnv.toAppendStream(table.select(xxx).keyBy(xxx), Row.class).addSink(xxxRichSinkFunction); xxxRichMapFunction中对某个字段写了row.setField(index, java.sql.timestamp.valueOf(str)); 即写入的值是一个 java.sql.timestamp类型的字段,且xxxRichMapFunction 中getProducedType() 中对应的位置也是Types.SQL_TIMESTAMP。 但在xxxRichSinkFunction我读取该字段时,却变成了java.time.LocalDateTime类型,导致在JdbcUtils.setRecordToStatement中抛出了java.lang.ClassCastException 我用 Blink planner就会有这个问题,如果程序启动时用useOldPlanner()就不会有这个问题。 张健
Flink 1.12 Blink planner timestamp类型转换异常
Hi, all flink1.12 Blink planner有人遇到过这样的问题么: 下面是简化的逻辑 DataStream ds = .map(xxxRichMapFunction); Table table = tableEnv.fromDataStream(ds); tableEnv.toAppendStream(table.select(xxx).keyBy(xxx), Row.class).addSink(xxxRichSinkFunction); xxxRichMapFunction中对某个字段写了row.setField(index, java.sql.timestamp.valueOf(str)); 即写入的值是一个 java.sql.timestamp类型的字段,且xxxRichMapFunction 中getProducedType() 中对应的位置也是Types.SQL_TIMESTAMP。 但在xxxRichSinkFunction我读取该字段时,却变成了java.time.LocalDateTime类型,导致在JdbcUtils.setRecordToStatement中抛出了java.lang.ClassCastException 我用 Blink planner就会有这个问题,如果程序启动时用useOldPlanner()就不会有这个问题。 张健
退订
退订 qhp...@hotmail.com
Re: sql-gateway和jdbc-driver还维护吗?
Hi Ada, sql-gateway之前没有维护起来,确实是一个遗憾。 最近我们也关注到大家对batch的兴趣越来越浓,sql-gateway还会继续维护。 btw,非常欢迎分享一下你们使用Flink替换Spark遇到的一些痛点,我们会逐渐去解决这些痛点 Best, Godfrey Ada Wong 于2022年1月12日周三 10:09写道: > > cc tsreaper and Godfrey He > > 文末丶 <809097...@qq.com.invalid> 于2022年1月10日周一 19:39写道: > > > > > 试下https://github.com/DataLinkDC/dlink 看看能不能满足你的需求 > > > > > > > > > > --原始邮件-- > > 发件人: > > "user-zh" > > > > > 发送时间:2022年1月10日(星期一) 晚上7:32 > > 收件人:"user-zh" > > > 主题:Re: sql-gateway和jdbc-driver还维护吗? > > > > > > > > https://github.com/ververica/flink-jdbc-driver > > https://github.com/ververica/flink-sql-gateway > > > > Ada Wong > > > 我看这俩项目一两年没更新了。想用Flink彻底替换到Spark,这俩项目是刚需,用来替换SparkThriftServer。
Re: sql-gateway和jdbc-driver还维护吗?
cc tsreaper and Godfrey He 文末丶 <809097...@qq.com.invalid> 于2022年1月10日周一 19:39写道: > > 试下https://github.com/DataLinkDC/dlink 看看能不能满足你的需求 > > > > > --原始邮件-- > 发件人: > "user-zh" > > 发送时间:2022年1月10日(星期一) 晚上7:32 > 收件人:"user-zh" > 主题:Re: sql-gateway和jdbc-driver还维护吗? > > > > https://github.com/ververica/flink-jdbc-driver > https://github.com/ververica/flink-sql-gateway > > Ada Wong > 我看这俩项目一两年没更新了。想用Flink彻底替换到Spark,这俩项目是刚需,用来替换SparkThriftServer。
Flink 1.12 Blink planner timestamp类型转换异常
Hi, all flink1.12 Blink planner有人遇到过这样的问题么: 下面是简化的逻辑 DataStream ds = .map(xxxRichMapFunction); Table table = tableEnv.fromDataStream(ds); tableEnv.toAppendStream(table.select(xxx).keyBy(xxx), Row.class).addSink(xxxRichSinkFunction); xxxRichMapFunction中对某个字段写了row.setField(index, java.sql.timestamp.valueOf(str)); 即写入的值是一个 java.sql.timestamp类型的字段,且xxxRichMapFunction 中getProducedType() 中对应的位置也是Types.SQL_TIMESTAMP。 但在xxxRichSinkFunction我读取该字段时,却变成了java.time.LocalDateTime类型,导致在JdbcUtils.setRecordToStatement中抛出了java.lang.ClassCastException 我用 Blink planner就会有这个问题,如果程序启动时用useOldPlanner()就不会有这个问题。 张健
(无主题)
退订
Re: 如何确定分配内存的大小
目前这个更多还是一个经验值,和具体业务有关使用有关,建议任务运行后观察JM和TM的GC情况后再做调整 许友昌 <18243083...@163.com> 于2022年1月10日周一 15:18写道: > 请问在启动flink 任务时,要如何确定该分配多少内存给 jobmanager,分配多少给 taskmanager,当我们指定 -ytm 1024 > 或 -ytm 2048 的依据是什么?
Re: 谁能解释一下 GlobalStreamExchangeMode 这几种交换模式的不同和使用场景吗?
在生产环境中使用Flink是批示作业是OK的,不是很依赖Flink Remote Shuffle Service Flink Remote Shuffle Service 主要解决大数据量Shuffle场景下的稳定性,目前Batch会将Shuffle的结果写本地磁盘,数量大的时候会容易将磁盘写满,稳定性也相对比较差 casel.chen 于2021年12月2日周四 08:26写道: > GlobalStreamExchangeMode 这几种交换模式的不同和使用场景是什么?哪些适合流式作业,哪些适合批式作业? > Flink Remote Shuffle Service的推出是不是意味着可以在生产环境使用Flink处理批式作业?谢谢! > > > package org.apache.flink.streaming.api.graph; > > > > > import org.apache.flink.annotation.Internal; > > > > > @Internal > > public enum GlobalStreamExchangeMode { > > ALL_EDGES_BLOCKING, > > FORWARD_EDGES_PIPELINED, > > POINTWISE_EDGES_PIPELINED, > > ALL_EDGES_PIPELINED, > > ALL_EDGES_PIPELINED_APPROXIMATE; > > > > > private GlobalStreamExchangeMode() { > > } > > } > > >
Re: 关于streamFileSink在checkpoint下生成文件问题
直接用的开源版本吗?还是公司内部有改动,原生的cp是固定频率,而很多公司离线计算都是整点触发的,为了减少延迟,会自定义在整点触发一次cp,开源目前没有这个feature 黄志高 于2021年12月1日周三 21:53写道: > hi,各位大佬,咨询个问题 > > > 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看 > > > >
Re: Re: 关于streamFileSink在checkpoint下生成文件问题
直接用的开源版本吗?还是公司内部有改动,原生的cp是固定频率,而很多公司离线计算都是整点触发的,为了减少延迟,会自定义在整点触发一次cp,开源目前没有这个feature 黄志高 于2021年12月2日周四 14:14写道: > | > > > > > 32684 > | > COMPLETED > | 8/8 | 13:52:36 | 13:52:38 | 2s | 126 KB | 0 B | > | | 32683 | > COMPLETED > | 8/8 | 13:42:36 | 13:42:39 | 2s | 126 KB | 0 B | > | | 32682 | > COMPLETED > | 8/8 | 13:32:36 | 13:32:39 | 2s | 126 KB | 0 B | > | | 32681 | > COMPLETED > | 8/8 | 13:22:36 | 13:22:39 | 2s | 125 KB | 0 B | > | | 32680 | > COMPLETED > | 8/8 | 13:12:36 | 13:12:39 | 2s | 125 KB | 0 B | > | | 32679 | > COMPLETED > | 8/8 | 13:02:36 | 13:02:41 | 4s | 214 KB | 0 B | > 上图是checkpoint > > > 这个是在11月30号0时段生成的文件 > 2021-11-30 00:00:011080827 athena_other-0-217891.gz > 2021-11-30 00:02:424309209 athena_other-0-217892.gz > 2021-11-30 00:12:403902474 athena_other-0-217893.gz > 2021-11-30 00:22:403886322 athena_other-0-217894.gz > 2021-11-30 00:32:403988037 athena_other-0-217895.gz > 2021-11-30 00:42:403892343 athena_other-0-217896.gz > 2021-11-30 00:52:392972183 athena_other-0-217897.gz > 2021-11-30 00:00:011125774 athena_other-1-219679.gz > 2021-11-30 00:02:424338748 athena_other-1-219680.gz > 2021-11-30 00:12:404204571 athena_other-1-219681.gz > 2021-11-30 00:22:403852791 athena_other-1-219682.gz > 2021-11-30 00:32:404025214 athena_other-1-219683.gz > 2021-11-30 00:42:404205107 athena_other-1-219684.gz > 2021-11-30 00:52:392922192 athena_other-1-219685.gz > 2021-11-30 00:00:011103734 athena_other-2-220084.gz > > > 这个是1点生成的文件 > 2021-11-30 01:00:011228793 athena_other-0-217951.gz > 2021-11-30 01:02:424243566 athena_other-0-217952.gz > 2021-11-30 01:12:404106305 athena_other-0-217953.gz > 2021-11-30 01:22:404456214 athena_other-0-217954.gz > 2021-11-30 01:32:414303156 athena_other-0-217955.gz > 2021-11-30 01:42:404688872 athena_other-0-217956.gz > 2021-11-30 01:52:403251910 athena_other-0-217957.gz > 2021-11-30 01:00:011163354 athena_other-1-219736.gz > 2021-11-30 01:02:424405233 athena_other-1-219737.gz > 2021-11-30 01:12:404094502 athena_other-1-219738.gz > 2021-11-30 01:22:404395071 athena_other-1-219739.gz > 2021-11-30 01:32:404205169 athena_other-1-219740.gz > 2021-11-30 01:42:404432610 athena_other-1-219741.gz > 2021-11-30 01:52:403224111 athena_other-1-219742.gz > 2021-11-30 01:00:011163964 athena_other-2-220137.gz > > > > > 之前的截图无法发送,我把文件贴出来,打扰了 > > > > > > > > 在 2021-12-02 13:52:43,"黄志高" 写道: > > > > > > Hi,我把文件放到下面的,文件在checkpoint可见我是理解的,但是文件的生成时间应该是在checkpoint以后是正常的,但是我却在每个整点时段看见数据文件,如下图所示,按理说文件的生成都是在checkpoint之后的,也就是2分,12,22,32,42,52分后,而每个00分都会生成一个数据文件,不理解这个文件怎么生成的,内部的滚动策略是OnCheckpointRollingPolicy > > > > > > > > > > > > > > > 在 2021-12-02 11:37:31,"Caizhi Weng" 写道: > >Hi! > > > >邮件里看不到图片和附件,建议使用外部图床。 > > > >partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证 > >exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。 > > > >黄志高 于2021年12月1日周三 下午9:53写道: > > > >> hi,各位大佬,咨询个问题 > >> > >> > 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整点时刻生成一个文件,我查阅下源码,没有找到整点触发滚动生成文件的逻辑,有大佬可以帮忙分析一下这个整点时刻生成的文件是怎么来的吗,它属于哪个周期的,附件中是我flink任务的checkpoint时间点,和2021年11月30日在1点和2点生成的文件截图,在1点和2点的00分都生成了一个文件,望大佬帮忙看看 > >> > >> > >> > >> > > > > > >
回复: flink sql 如何提高下游并发度?
hi, 设置了parallelism=10 ,实际上是分配了 10 个 slot,flink 是会共享 slot 的,所以 sink 会有 10 线程。 在2022年1月11日 16:53,RS 写道: Hi, 请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10? 如果source是10的话,那还有7个线程就空闲了? 在 2022-01-11 11:10:41,"Caizhi Weng" 写道: Hi! 可以设置 parallelism.default 为需要的并发数。 Jeff 于2022年1月9日周日 19:44写道: 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?
Re: Flink on Native K8s 部署模式下Tm和Jm容器配置Hosts问题
你可以通过环境变量或者flink config option的方式来指定kube config export KUBECONFIG=/path/of/kube.config 或者 -Dkubernetes.config.file=/path/of/kube.config 具体的代码在这里[1] [1]. https://github.com/apache/flink/blob/master/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java#L58 Best, Yang JianWen Huang 于2022年1月10日周一 22:04写道: > 首先感谢您答复。我也想到了采用第二种JOB动态+ConfigMap挂到Flink Client Pod中,然后命令提交。 > 另外您和官方文档都提到kube config的配置。请问flink client在源码实现中是在哪个地方去解析读取kube config的? > > Yang Wang 于2022年1月10日周一 15:17写道: > > > > 抱歉回复晚了 > > > > 在实践中,Flink on Native K8s的部署方式需要一个机器同时拥有k8s和flink客户端才能很好的完成部署工作。 > > > > Flink client并不依赖K8s客户端的,只要有对应的kube config就可以了 > > > > > > 你说的两种方法都是可以的,而且也没有本质上的差异。都是把Flink client运行在集群内来完成提交,第一种是常驻的,第二种是动态起的 。 > > 如果作业使用的pod template都是一样的,那就可以自己保存在ConfigMap中然后挂载给Flink client pod就可以了。 > > 如果每个作业使用的都不同,就只能按照你说的方法了 > > > > > > 另外,还有一个可行的思路是开发一个你们自己的K8s operator,然后通过CR的方式进行传递。可以参考这个简单的demo[1] > > > > [1]. https://github.com/wangyang0918/flink-native-k8s-operator > > > > > > Best, > > Yang > > > > > > > > JianWen Huang 于2021年12月30日周四 00:01写道: > > > > > 明白了。感谢。 > > > 在实践中,Flink on Native K8s的部署方式需要一个机器同时拥有k8s和flink客户端才能很好的完成部署工作。 > > > 请问在工程实践上有什么比较好的持续集成提交方式。我目前想到两种。 > > > 1.在k8s 启动一个带flink客户端的容器。在容器内部进行命令行提交。 > > > 2.在k8s以带Flink客户端的镜像启动一个Job类型作业,然后在作业运行时进行命令提交。 > > > > > > > 第1种对于kubernetes.pod-template-file的提交需要把kubernetes.pod-template-file中的模板文件cp到容器中。 > > > 第2种需要提前把kubernetes.pod-template-file文件打到带Flink客户端的镜像中。 > > > 请问您有更好的方法吗。 > > > > > > Yang Wang 于2021年12月26日周日 16:39写道: > > > > > > > > 拿如下提交命令举例,pod-temlate.yaml是在和运行run-application这个命令相同的机器上面。Flink > > > > client会自动把这个文件存放到ConfigMap,然后挂载给JM的 > > > > user jar(StateMachineExample.jar)是需要在镜像里面 > > > > > > > > 注意:一般需要在镜像里面的都会使用local://这个schema,本地文件则不需要 > > > > > > > > bin/flink run-application -t kubernetes-application \ > > > > -Dkubernetes.cluster-id=my-flink-cluster \ > > > > -Dkubernetes.pod-template-file=/path/of/pod-template.yaml \ > > > > local:///opt/flink/examples/streaming/StateMachineExample.jar > > > > > > > > > > > > > > > > 如果还是不明白,看一下这个测试的实现就清楚了[1] > > > > > > > > [1]. > > > > > > > > https://github.com/apache/flink/blob/master/flink-end-to-end-tests/test-scripts/test_kubernetes_application_ha.sh > > > > > > > > > > > > Best, > > > > Yang > > > > > > > > 黄剑文 于2021年12月24日周五 17:57写道: > > > > > > > > > client-local的文件,不是镜像里面的。这句话该怎么理解?因为run-application > > > > > > > > > > > > > > 模式下是需要将用户jar包跟flink标准镜像打到一起形成自己镜像然后进行提交。那么这个文件该放在哪个地方?目前我指定路径发现读的是镜像包中的路径。如/opt/my-pod-template。读的是镜像中/opt/my-pod-template文件。 > > > > > > > > > > 谢谢您的回复。 > > > > > > > > > > Yang Wang 于2021年12月24日周五 11:18写道: > > > > > > > > > > > > 使用flink > > > > > > > > > > run-application来提交任务时,kubernetes.pod-template-file需要指定的是一个client-local的文件 > > > > > > 不是镜像里面的 > > > > > > > > > > > > Best, > > > > > > Yang > > > > > > > > > > > > hjw <1010445...@qq.com.invalid> 于2021年12月23日周四 22:21写道: > > > > > > > > > > > > > Flink版本:1.13Flink基于Native K8s > > > > > > > > > > > > > > > > 部署模式下,因为有场景需要,jobmanager和taskmanager需要配置一些特定的hosts,查阅官方文档后发现可以支持自己指定一些pod-Template来指定jm和tm的一些K8s部署行为,但这些pod-Template需要打在提交客户端镜像里。 > > > > > > > > > > > > > > > > > > > > > > > 问题是jm和tm在不同环境下需要配置的Hosts并不相同。如开发环境,测试环境,生产环境。这意味着不同环境需维护不同的镜像。请问各位在使用上有什么好方法去解决呢。谢谢。 > > > > > > > > >
Re: flink sql 如何提高下游并发度?
可以的,提供一个思路,读取了kafka的数据后,直接输出原生的字节流后,接一层map算子做序列化相关工作,map算子的并发度你可以自己控制,这样kafka拉取就不会是瓶颈,大量的计算工作放到了map中,而map的并发度是可以自己控制的 Jeff 于2022年1月9日周日 19:45写道: > 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?
回复: flink sql 如何提高下游并发度?
hi 是 10 目前 source 还不支持单独设置并发度,但是 sink 是支持的,当然如果没有单独设置的话 sink 也是 10 Best JasonLee 在2022年01月11日 16:52,RS 写道: Hi, 请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10? 如果source是10的话,那还有7个线程就空闲了? 在 2022-01-11 11:10:41,"Caizhi Weng" 写道: Hi! 可以设置 parallelism.default 为需要的并发数。 Jeff 于2022年1月9日周日 19:44写道: 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?
Re: flink sql 如何提高下游并发度?
可以的,提供一个思路,读取了kafka的数据后,直接输出原生的字节流后,接一层map算子做序列化相关工作,map算子的并发度你可以自己控制,这样kafka拉取就不会是瓶颈,大量的计算工作放到了map中,而map的并发度是可以自己控制的 Caizhi Weng 于2022年1月11日周二 11:11写道: > Hi! > > 可以设置 parallelism.default 为需要的并发数。 > > Jeff 于2022年1月9日周日 19:44写道: > > > 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢? >
Re:Re: flink sql 如何提高下游并发度?
Hi, 请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10? 如果source是10的话,那还有7个线程就空闲了? 在 2022-01-11 11:10:41,"Caizhi Weng" 写道: >Hi! > >可以设置 parallelism.default 为需要的并发数。 > >Jeff 于2022年1月9日周日 19:44写道: > >> 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?