Re: 回复:报错显示为bug
Hi, 从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段 Best, Shammon FY On Mon, May 15, 2023 at 7:29 PM lxk wrote: > 你好,从报错来看是类型不兼容导致的。 > Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column > 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime" > 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换 > > > > > > > > > > > > > > > > > > At 2023-05-15 18:29:15, "小昌同学" wrote: > >| > >package job; > >import bean.BaseInfo; > >import bean.MidInfo; > >import bean.OutInfo; > >import bean.ResultInfo; > >import com.alibaba.fastjson.JSON; > >import com.alibaba.fastjson.JSONObject; > >import config.FlinkConfig; > >import function.MyProcessFunction; > >import org.apache.flink.api.common.functions.MapFunction; > >import org.apache.flink.api.common.serialization.SimpleStringSchema; > >import org.apache.flink.api.java.tuple.Tuple2; > >import org.apache.flink.streaming.api.TimeCharacteristic; > >import org.apache.flink.streaming.api.datastream.DataStream; > >import org.apache.flink.streaming.api.datastream.DataStreamSource; > >import > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > >import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > >import org.apache.flink.table.api.DataTypes; > >import org.apache.flink.table.api.Schema; > >import org.apache.flink.table.api.Table; > >import org.apache.flink.table.api.TableSchema; > >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >import org.apache.flink.table.types.DataType; > >import org.apache.flink.util.OutputTag; > >import sink.Sink2Mysql; > >import utils.DateUtil; > >import utils.DateUtils; > >import utils.JdbcUtil; > > > >import java.sql.Connection; > >import java.sql.PreparedStatement; > >import java.sql.ResultSet; > >import java.time.*; > >import java.util.Date; > >import java.util.HashMap; > >import java.util.Properties; > > > >public class RytLogAnly4 { > >public static void main(String[] args) throws Exception { > >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > >//使用侧输出流 > >OutputTag requestStream = new > OutputTag("requestStream") { > >}; > >OutputTag answerStream = new > OutputTag("answerStream") { > >}; > > > >//1、连接测试环境kafka的数据 > >String servers = > FlinkConfig.config.getProperty("dev_bootstrap.servers"); > >String topicName = > FlinkConfig.config.getProperty("dev_topicName"); > >String groupId = FlinkConfig.config.getProperty("dev_groupId"); > >String devMode = FlinkConfig.config.getProperty("dev_mode"); > >Properties prop = new Properties(); > >prop.setProperty("bootstrap.servers", servers); > >prop.setProperty("group.id", groupId); > >prop.setProperty("auto.offset.reset", devMode); > >DataStreamSource sourceStream = env.addSource(new > FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop)); > >//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- > <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"} > > > >//2、对源数据进行处理,生成baseInfo基类的数据 > >SingleOutputStreamOperator baseInfoStream = > sourceStream.map(new MapFunction() { > >@Override > >public BaseInfo map(String value) throws Exception { > >JSONObject jsonObject = JSON.parseObject(value); > >//获取到不同的服务器IP > >String serverIp = jsonObject.getString("ip"); > >//获取到不同的data的数据 > >String datas = jsonObject.getString("data"); > > > >String[] splits = datas.split("\n"); > >HashMap dataMap = new HashMap<>(); > >//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间 > >String time = splits[0].substring(7, 19); > >//将subData填充到自定义类型中,用来判断时请求还是应答 > >String subData = datas.substring(0, 10); > >for (int i = 0; i < splits.length; i++) { > >if (splits[i].contains("=")) { > >splits[i] = splits[i].replaceFirst("=", "&"); > >String[] temp = splits[i].split("&"); > >if (temp.length > 1) { > >dataMap.put(temp[0].toLowerCase(), temp[1]); > >} > >} > >
Re:Re: Flink提交作业是否可以跳过上传作业jar包这一步?
Application Mode没有这个问题,现在是Session Mode提交作业会遇到这个问题 ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar 后面这个作业TopSpeedWindowing.jar包可以使用hdfs/oss路径指定吗?如果是分布式文件路径的话是不是就不用上传作业jar包到jobManager了,而是由jobManager自行下载? 在 2023-05-15 19:27:21,"shimin huang" 写道: >可以考虑基于flink-kubernetes依赖下的KubernetesClusterDescriptor来启动任务,可以参考https://github.com/collabH/flink-deployer/blob/main/infrastructure/src/main/java/com/flink/plugins/inf/deployer/KubernetesClusterDeployer.java > > >> 2023年5月15日 19:21,casel.chen 写道: >> >> 我们开发了一个实时计算平台提交flink >> sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat >> >> jar有几百MB,又因为平台所在的k8s集群和作业真正运行的k8s集群是不同的,某些集群的跨k8s离群网络传输开销(跨地区甚至跨云厂商)比较大,而且这个flinksql.jar我们已经放到了k8s镜像当中,jobManager加载镜像后是可以在本地找到该jar的,所以想问一下Flink提交作业是否可以跳过上传作业jar包这一步?有没有参数指定直接去本地加载该作业jar包?这样的话,一是可以减少网络传输开销,二是可以加快flink >> sql作业提交的速度。 >
Re:回复:报错显示为bug
你好,从报错来看是类型不兼容导致的。 Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime" 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换 At 2023-05-15 18:29:15, "小昌同学" wrote: >| >package job; >import bean.BaseInfo; >import bean.MidInfo; >import bean.OutInfo; >import bean.ResultInfo; >import com.alibaba.fastjson.JSON; >import com.alibaba.fastjson.JSONObject; >import config.FlinkConfig; >import function.MyProcessFunction; >import org.apache.flink.api.common.functions.MapFunction; >import org.apache.flink.api.common.serialization.SimpleStringSchema; >import org.apache.flink.api.java.tuple.Tuple2; >import org.apache.flink.streaming.api.TimeCharacteristic; >import org.apache.flink.streaming.api.datastream.DataStream; >import org.apache.flink.streaming.api.datastream.DataStreamSource; >import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >import org.apache.flink.table.api.DataTypes; >import org.apache.flink.table.api.Schema; >import org.apache.flink.table.api.Table; >import org.apache.flink.table.api.TableSchema; >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >import org.apache.flink.table.types.DataType; >import org.apache.flink.util.OutputTag; >import sink.Sink2Mysql; >import utils.DateUtil; >import utils.DateUtils; >import utils.JdbcUtil; > >import java.sql.Connection; >import java.sql.PreparedStatement; >import java.sql.ResultSet; >import java.time.*; >import java.util.Date; >import java.util.HashMap; >import java.util.Properties; > >public class RytLogAnly4 { >public static void main(String[] args) throws Exception { >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >//使用侧输出流 >OutputTag requestStream = new > OutputTag("requestStream") { >}; >OutputTag answerStream = new > OutputTag("answerStream") { >}; > >//1、连接测试环境kafka的数据 >String servers = > FlinkConfig.config.getProperty("dev_bootstrap.servers"); >String topicName = FlinkConfig.config.getProperty("dev_topicName"); >String groupId = FlinkConfig.config.getProperty("dev_groupId"); >String devMode = FlinkConfig.config.getProperty("dev_mode"); >Properties prop = new Properties(); >prop.setProperty("bootstrap.servers", servers); >prop.setProperty("group.id", groupId); >prop.setProperty("auto.offset.reset", devMode); >DataStreamSource sourceStream = env.addSource(new > FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop)); >//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- ><315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"} > >//2、对源数据进行处理,生成baseInfo基类的数据 >SingleOutputStreamOperator baseInfoStream = > sourceStream.map(new MapFunction() { >@Override >public BaseInfo map(String value) throws Exception { >JSONObject jsonObject = JSON.parseObject(value); >//获取到不同的服务器IP >String serverIp = jsonObject.getString("ip"); >//获取到不同的data的数据 >String datas = jsonObject.getString("data"); > >String[] splits = datas.split("\n"); >HashMap dataMap = new HashMap<>(); >//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间 >String time = splits[0].substring(7, 19); >//将subData填充到自定义类型中,用来判断时请求还是应答 >String subData = datas.substring(0, 10); >for (int i = 0; i < splits.length; i++) { >if (splits[i].contains("=")) { >splits[i] = splits[i].replaceFirst("=", "&"); >String[] temp = splits[i].split("&"); >if (temp.length > 1) { >dataMap.put(temp[0].toLowerCase(), temp[1]); >} >} >} >return new BaseInfo(dataMap.get("action"), serverIp, >DateUtil.string2Long(time), dataMap.get("handleserialno"), subData); >} >}); > >//3、使用process方法进行baseInfoStream流切割 >SingleOutputStreamOperator tagStream = > baseInfoStream.process(new MyProcessFunction(requestStream, answerStream)); > >//4、根据不同的tag进行不同的输出流设定 >
Re: Flink提交作业是否可以跳过上传作业jar包这一步?
可以考虑基于flink-kubernetes依赖下的KubernetesClusterDescriptor来启动任务,可以参考https://github.com/collabH/flink-deployer/blob/main/infrastructure/src/main/java/com/flink/plugins/inf/deployer/KubernetesClusterDeployer.java > 2023年5月15日 19:21,casel.chen 写道: > > 我们开发了一个实时计算平台提交flink > sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat > > jar有几百MB,又因为平台所在的k8s集群和作业真正运行的k8s集群是不同的,某些集群的跨k8s离群网络传输开销(跨地区甚至跨云厂商)比较大,而且这个flinksql.jar我们已经放到了k8s镜像当中,jobManager加载镜像后是可以在本地找到该jar的,所以想问一下Flink提交作业是否可以跳过上传作业jar包这一步?有没有参数指定直接去本地加载该作业jar包?这样的话,一是可以减少网络传输开销,二是可以加快flink > sql作业提交的速度。
Flink提交作业是否可以跳过上传作业jar包这一步?
我们开发了一个实时计算平台提交flink sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat jar有几百MB,又因为平台所在的k8s集群和作业真正运行的k8s集群是不同的,某些集群的跨k8s离群网络传输开销(跨地区甚至跨云厂商)比较大,而且这个flinksql.jar我们已经放到了k8s镜像当中,jobManager加载镜像后是可以在本地找到该jar的,所以想问一下Flink提交作业是否可以跳过上传作业jar包这一步?有没有参数指定直接去本地加载该作业jar包?这样的话,一是可以减少网络传输开销,二是可以加快flink sql作业提交的速度。
StreamTable Environment initialized failed -- "Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath"
Hi ALL, OS: CentOS 7.9 Flink version: 1.16.0 It looks like I'm hitting a notorious exception which had been discoverd since earlier fink version. The issue was triggered when below java code executed: StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); More detailed trace is as below : Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:109) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:101) at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122) at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:94) at com.sugon.cloud.paas.flink.cdc.FlinkCDC_mysql2doris_example.main(FlinkCDC_mysql2doris_example.java:63) Caused by: org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath. at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:533) at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:106) ... 4 more What I've done: 1) Added missed dependencies in "pom.xml", for example: org.apache.flink flink-table-api-java-uber 1.16.1 provided org.apache.flink flink-table-planner_${scala.binary.version} ${flink.version} provided 2)Tried two methods to run application, got same error(see above) mvn exec:java -Dexec.mainClass="xxx" java -jar target/xxx.jar I'm confused by the error because all necessary jar files does exist in Maven's local repository or FLINK_HOME's lib dir. The completed "pom.xml" is included in attachment. Thanks, Leo http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> 4.0.0 com.mycompany.cloud.bigdata.flink flink-cdc-doris-example 1.0-SNAPSHOT jar UTF-8 1.8 1.8 1.16.0 2.3.0 2.12 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-table-api-java-bridge ${flink.version} com.ververica flink-connector-mysql-cdc ${flink.connector.version} org.apache.flink flink-table-planner_${scala.binary.version} ${flink.version} provided org.apache.flink flink-table-api-java-uber ${flink.version} provided org.apache.maven.plugins maven-compiler-plugin 3.8.1 ${maven.compiler.source} ${maven.compiler.target} org.apache.maven.plugins maven-shade-plugin 3.2.4 package shade com.mycompany.cloud.bigdata.flink.cdc.FlinkCDC_mysql2doris_example org.apache.flink:force-shading *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA
回复:报错显示为bug
| package job; import bean.BaseInfo; import bean.MidInfo; import bean.OutInfo; import bean.ResultInfo; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import config.FlinkConfig; import function.MyProcessFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.types.DataType; import org.apache.flink.util.OutputTag; import sink.Sink2Mysql; import utils.DateUtil; import utils.DateUtils; import utils.JdbcUtil; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.time.*; import java.util.Date; import java.util.HashMap; import java.util.Properties; public class RytLogAnly4 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //使用侧输出流 OutputTag requestStream = new OutputTag("requestStream") { }; OutputTag answerStream = new OutputTag("answerStream") { }; //1、连接测试环境kafka的数据 String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers"); String topicName = FlinkConfig.config.getProperty("dev_topicName"); String groupId = FlinkConfig.config.getProperty("dev_groupId"); String devMode = FlinkConfig.config.getProperty("dev_mode"); Properties prop = new Properties(); prop.setProperty("bootstrap.servers", servers); prop.setProperty("group.id", groupId); prop.setProperty("auto.offset.reset", devMode); DataStreamSource sourceStream = env.addSource(new FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop)); //{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"} //2、对源数据进行处理,生成baseInfo基类的数据 SingleOutputStreamOperator baseInfoStream = sourceStream.map(new MapFunction() { @Override public BaseInfo map(String value) throws Exception { JSONObject jsonObject = JSON.parseObject(value); //获取到不同的服务器IP String serverIp = jsonObject.getString("ip"); //获取到不同的data的数据 String datas = jsonObject.getString("data"); String[] splits = datas.split("\n"); HashMap dataMap = new HashMap<>(); //将time填充到自定义类型中,用来判断同一个num的请求以及相应时间 String time = splits[0].substring(7, 19); //将subData填充到自定义类型中,用来判断时请求还是应答 String subData = datas.substring(0, 10); for (int i = 0; i < splits.length; i++) { if (splits[i].contains("=")) { splits[i] = splits[i].replaceFirst("=", "&"); String[] temp = splits[i].split("&"); if (temp.length > 1) { dataMap.put(temp[0].toLowerCase(), temp[1]); } } } return new BaseInfo(dataMap.get("action"), serverIp, DateUtil.string2Long(time), dataMap.get("handleserialno"), subData); } }); //3、使用process方法进行baseInfoStream流切割 SingleOutputStreamOperator tagStream = baseInfoStream.process(new MyProcessFunction(requestStream, answerStream)); //4、根据不同的tag进行不同的输出流设定 DataStream requestDataStream = tagStream.getSideOutput(requestStream); DataStream answerDataStream = tagStream.getSideOutput(answerStream); requestDataStream.print("requestDataStream"); answerDataStream.print("answerDataStream"); //5、上面的流仅仅只是携带了action编码,没有对应的action中午注释,需要去关联一下MySQL中的表 //5.1 先对请求流进行处理
Re:报错显示为bug
你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。 在 2023-05-15 17:11:42,"小昌同学" 写道: >各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: >org.apache.flink.api.common.InvalidProgramException: Table program cannot be >compiled. This is a bug. Please file an issue. “ >flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作 > > >| | >小昌同学 >| >| >ccc0606fight...@163.com >|
报错显示为bug
各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. “ flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作 | | 小昌同学 | | ccc0606fight...@163.com |
Re:Re: Re: Flink广播流状态清理策略不生效
好的,感谢 在 2023-05-15 15:49:12,"Hangxiang Yu" 写道: >Hi, 可以参考这个 Ticket ,就是讨论要给 Broadcast State 加 TTL 的,当时应该没有继续深入讨论: >https://issues.apache.org/jira/browse/FLINK-13721 >方便的话你可以在 Ticket 下面也分享下你的使用场景、观察到的现象吗?也可以在 Ticket 下 Vote for this issue. >我这边也会帮忙一起看下 > >On Mon, May 15, 2023 at 1:41 PM lxk wrote: > >> 这么看来,广播流好像不适合在生产中使用,状态会无限止的增长。这块官方有计划增加ttl功能吗。 >> 或者使用广播流的时候有没有什么能够手动清理状态的方法? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2023-05-15 11:28:54,"Hangxiang Yu" 写道: >> >Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里 >> >< >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl >> >对 >> >State TTL 的描述; >> > >> >On Mon, May 15, 2023 at 11:05 AM lxk wrote: >> > >> >> flink版本:1.14 >> >> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。 >> >> 在主程序中,我设置了状态过期策略: >> >>SingleOutputStreamOperator baiduStream = >> >> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data, >> >> AdvertiseClick.class)).name("BaiDuAdClick"); >> >> MapStateDescriptor baiduInfoMap = new >> >> MapStateDescriptor<>("advertiseInfo", String.class, >> AdvertiseClick.class); >> >> StateTtlConfig ttlConfig = StateTtlConfig >> >> .newBuilder(Time.days(7)) >> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >> >> .cleanupFullSnapshot() >> >> .cleanupIncrementally(200, true) >> >> .build(); >> >> baiduInfoMap.enableTimeToLive(ttlConfig); >> >> 在BroadcastProcessFunction中,我也设置了状态清除策略: >> >> public void open(Configuration parameters) throws Exception { >> >> jedisClusterSink = Ad_MyRedisUtil.getJedisClient(); >> >> baiduInfoDesc = new MapStateDescriptor> >> AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class); >> >> StateTtlConfig ttlConfig = StateTtlConfig >> >> .newBuilder(Time.days(7)) >> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >> >> .cleanupFullSnapshot() >> >> .cleanupIncrementally(200, true) >> >> .build(); >> >> baiduInfoDesc.enableTimeToLive(ttlConfig); >> >> >> >> } >> >> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。 >> >> >> >> >> >> https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg >> >> >> >> >> >> >> >> >> >> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。 >> > >> > >> > >> >-- >> >Best, >> >Hangxiang. >> > > >-- >Best, >Hangxiang.
Re: Re: Flink广播流状态清理策略不生效
Hi, 可以参考这个 Ticket ,就是讨论要给 Broadcast State 加 TTL 的,当时应该没有继续深入讨论: https://issues.apache.org/jira/browse/FLINK-13721 方便的话你可以在 Ticket 下面也分享下你的使用场景、观察到的现象吗?也可以在 Ticket 下 Vote for this issue. 我这边也会帮忙一起看下 On Mon, May 15, 2023 at 1:41 PM lxk wrote: > 这么看来,广播流好像不适合在生产中使用,状态会无限止的增长。这块官方有计划增加ttl功能吗。 > 或者使用广播流的时候有没有什么能够手动清理状态的方法? > > > > > > > > > > > > > > > > > > 在 2023-05-15 11:28:54,"Hangxiang Yu" 写道: > >Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里 > >< > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl > >对 > >State TTL 的描述; > > > >On Mon, May 15, 2023 at 11:05 AM lxk wrote: > > > >> flink版本:1.14 > >> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。 > >> 在主程序中,我设置了状态过期策略: > >>SingleOutputStreamOperator baiduStream = > >> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data, > >> AdvertiseClick.class)).name("BaiDuAdClick"); > >> MapStateDescriptor baiduInfoMap = new > >> MapStateDescriptor<>("advertiseInfo", String.class, > AdvertiseClick.class); > >> StateTtlConfig ttlConfig = StateTtlConfig > >> .newBuilder(Time.days(7)) > >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > >> .cleanupFullSnapshot() > >> .cleanupIncrementally(200, true) > >> .build(); > >> baiduInfoMap.enableTimeToLive(ttlConfig); > >> 在BroadcastProcessFunction中,我也设置了状态清除策略: > >> public void open(Configuration parameters) throws Exception { > >> jedisClusterSink = Ad_MyRedisUtil.getJedisClient(); > >> baiduInfoDesc = new MapStateDescriptor >> AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class); > >> StateTtlConfig ttlConfig = StateTtlConfig > >> .newBuilder(Time.days(7)) > >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) > >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) > >> .cleanupFullSnapshot() > >> .cleanupIncrementally(200, true) > >> .build(); > >> baiduInfoDesc.enableTimeToLive(ttlConfig); > >> > >> } > >> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。 > >> > >> > >> https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg > >> > >> > >> > >> > >> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。 > > > > > > > >-- > >Best, > >Hangxiang. > -- Best, Hangxiang.