Re:RE: flink如何正确使用mybatis
好的,谢谢老师 在 2023-07-26 21:04:20,"Jiabao Sun" 写道: >SqlSession 需要关闭,建议使用 SqlSessionManager,可以不用手动关闭 SqlSession。 > > >On 2023/07/18 02:13:16 lxk wrote: >> 在flink内需要使用mybatis做些简化查询的工作,目前我的使用方式如下 >> >> public class MybatisUtil { >> >> private static final Logger LOGGER = >> LogFactory.createNewLogger("MybatisUtil"); >> private static ThreadLocal tl = new >> ThreadLocal(); >> private static SqlSessionFactory factory = null; >> //private static SqlSession sqlSession = null; >> static { >> // 1 读取配置文件 config.xml >> InputStream in = null; >> try { >> in = Resources.getResourceAsStream("batis.xml"); >> } catch (IOException e) { >> // TODO Auto-generated catch block >> e.printStackTrace(); >> throw new RuntimeException(e); >> } >> // 2 创建SqlSessionFactory >> factory = new SqlSessionFactoryBuilder().build(in); >> } >> >> >> >> public static SqlSession getSqlSession(){ >> SqlSession sqlSession = tl.get(); >> if(sqlSession == null){ >> sqlSession = factory.openSession(); >> tl.set(sqlSession); >> LOGGER.info("sqlSession创建成功,连接为:{},时间为:{}", sqlSession,LocalTimeUtil.now()); >> } >> return sqlSession; >> } >> >> >> } >> 以上是工具类 >> 我在open方法中获取sqlsession,然后在invoke方法中使用mapper >> public void open(Configuration parameters) throws Exception { >> sqlSession = MybatisUtil.getSqlSession(); >> } >> >> public List map(HeaderFullWithPreOrder headerFullWithPreOrder) >> throws Exception { >> SelectAddCartMapper mapper = sqlSession.getMapper(SelectAddCartMapper.class); >> ...其他省略 >> } >> >> 想问下这种方式使用是否正确。以及sqlsession是否需要关闭,看见相关帖子有说如果sqlsession不关闭的话会把连接打满 >>
Flink窗口状态清除疑问
相关配置: Flink:1.16 | Checkpointing Mode | Exactly Once | | Checkpoint Storage | FileSystemCheckpointStorage | | State Backend | EmbeddedRocksDBStateBackend | | Interval | 8m 0s | 我有一个程序,主要是用来统计一些热门商品之类的数据 具体代码如下: .keyBy(data -> data.getShopId() + data.getYh_productid()) .window(TumblingEventTimeWindows.of(Time.minutes(30))) .sum("count").name("clickCount"); 按照官方文档的说法,状态在窗口触发后应该就会清除。但是我在webui上以及grafana监控上看到的checkpoint大小一直在增长。 webui:https://pic.imgdb.cn/item/64bf3efa1ddac507cc6484d5.jpg grafana:https://pic.imgdb.cn/item/64bf3fb71ddac507cc65a2c4.jpg 想知道下这个具体的原因可能是啥
flink如何正确使用mybatis
在flink内需要使用mybatis做些简化查询的工作,目前我的使用方式如下 public class MybatisUtil { private static final Logger LOGGER = LogFactory.createNewLogger("MybatisUtil"); private static ThreadLocal tl = new ThreadLocal(); private static SqlSessionFactory factory = null; //private static SqlSession sqlSession = null; static { // 1 读取配置文件 config.xml InputStream in = null; try { in = Resources.getResourceAsStream("batis.xml"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); throw new RuntimeException(e); } // 2 创建SqlSessionFactory factory = new SqlSessionFactoryBuilder().build(in); } public static SqlSession getSqlSession(){ SqlSession sqlSession = tl.get(); if(sqlSession == null){ sqlSession = factory.openSession(); tl.set(sqlSession); LOGGER.info("sqlSession创建成功,连接为:{},时间为:{}", sqlSession,LocalTimeUtil.now()); } return sqlSession; } } 以上是工具类 我在open方法中获取sqlsession,然后在invoke方法中使用mapper public void open(Configuration parameters) throws Exception { sqlSession = MybatisUtil.getSqlSession(); } public List map(HeaderFullWithPreOrder headerFullWithPreOrder) throws Exception { SelectAddCartMapper mapper = sqlSession.getMapper(SelectAddCartMapper.class); ...其他省略 } 想问下这种方式使用是否正确。以及sqlsession是否需要关闭,看见相关帖子有说如果sqlsession不关闭的话会把连接打满
Re:Flink cep busy
你好,整个程序有反压吗 在 2023-07-10 15:32:44,"jiaot...@mail.jj.cn" 写道: >Hello, > 我定义了一个pattern (a->b->c->d->e->f->g)在10分钟内匹配,通过在WebUI上查看任务很快在cep节点 > busy(max)100%,我发现通过增加cep节点的并发度并不能解决问题,且checkpoint随着时间的推移状态大小越来越大,数据应该存在大量堆积。数据源同时消费4个kafka > topic > (setTopics),采用默认水位线间隔时间,我发现4个topic的数据流量存在比较大的差异,因此我增加了水位线触发间隔时间,同时水位线时间戳来自于四个topic中最小的数值。但是问题依然没有解决,cep节点在几个小时后依然busy。 >@Override >public void onEvent(LobbyPathData lobbyPathData, long l, WatermarkOutput >watermarkOutput) { >String key = lobbyPathData.getProject() + lobbyPathData.getEvent_name(); >if (!maxTimePerTopic.containsKey(key) || l > maxTimePerTopic.get(key)) { >maxTimePerTopic.put(key, l); >} >} > >@Override >public void onPeriodicEmit(WatermarkOutput watermarkOutput) { >Optional min = > maxTimePerTopic.values().stream().min(Comparator.comparingLong(Long::valueOf)); >min.ifPresent(t -> watermarkOutput.emitWatermark(new Watermark(t - > outOfOrdernessMillis - 1))); >}
Re:flink jdbcsink 连接数的问题
hi, jdbc创建链接是在SimpleJdbcConnectionProvider这个类中实现的,至于真正创建链接,则是由DriverManager来处理。 关于连接数,则是跟你的并行度有关。 在 2023-05-30 13:55:57,"小昌同学" 写道: >各位老师,请教一下关于flink jdbcsink 连接数的问题; >我使用代码如下:在以下代码中,我查看了一下源码,并没有找到sink到MySQL时关于连接数的设定,请问这一块关于连接数的设定我应该怎么写呀; >谢谢各位老师的指导 > >| >outPutInfoStream.addSink(JdbcSink.sink( >"REPLACE into InputInfo (breakCode, breakName, breakDuration, >breakRule,breakPrimaryKey,breakStep,breakStepType,breakTime,breakSendTime,breakArgs) > values (?,?,?,?,?,?,?,?,?,?)", >(statement, InPutInfo) -> { >statement.setString(1,InPutInfo.getBreakCode()); >statement.setString(2,InPutInfo.getBreakName()); >statement.setLong(3,InPutInfo.getBreakDuration()); >statement.setString(4,InPutInfo.getBreakRule()); >statement.setString(5,InPutInfo.getBreakPrimaryKey()); >statement.setString(6, InPutInfo.getBreakStep()); >statement.setString(7, InPutInfo.getBreakStepType()); >statement.setString(8,InPutInfo.getBreakTime()); >statement.setString(9, DateUtil.format(new Date())); >statement.setString(10, > String.valueOf(InPutInfo.getBreakArgs())); >}, >JdbcExecutionOptions.builder() >.withBatchSize(10) >.withBatchIntervalMs(200) >.withMaxRetries(5) >.build(), >new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() > > .withUrl("jdbc:mysql://111/iap?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true") >.withDriverName("com.mysql.jdbc.Driver") >.withUsername("111") >.withPassword("111") >.build() >)).name("sink-mysql"); >| > > >| | >小昌同学 >| >| >ccc0606fight...@163.com >|
Flink使用精准一次写入kafka报错
上封邮件发错了,重新发一下。项目中使用精准一次语义写入kafka,代码和配置如下: 写入代码如下: Properties producerProperties = MyKafkaUtil.getProducerProperties(); KafkaSink kafkaSink = KafkaSink.builder() .setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server")) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(Event2Kafka.parameterTool.get("feature.topic.name")) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setKafkaProducerConfig(producerProperties) .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("streamx_flow_1261") .build(); eventJsonStream.sinkTo(kafkaSink).setParallelism(14) .name("event2kafka").uid("kafkasink"); kafka配置如下: public static Properties getProducerProperties(){ Properties kafkaProducerProps = new Properties(); kafkaProducerProps.setProperty("bootstrap.servers", parameterTool.get(bootstrap.server")); kafkaProducerProps.setProperty("auto.commit.interval.ms", "5000"); kafkaProducerProps.setProperty("auto.offset.reset", "latest"); kafkaProducerProps.setProperty("session.timeout.ms", "5000"); kafkaProducerProps.setProperty("transaction.timeout.ms",12*6 +""); kafkaProducerProps.put("security.protocol", "SASL_PLAINTEXT"); kafkaProducerProps.put("sasl.kerberos.service.name","kafka"); return kafkaProducerProps; } 项目运行很久都没啥问题,最近突然报了以下的错误 org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka -topic-2@-1 with FlinkKafkaInternalProducer{transactionalId='streamx_flow_1261-8-5', inTransaction=true, closed=false} at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:436) at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:417) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number. 参考了stackoverflow上面的回答:https://stackoverflow.com/questions/55192852/transactional-producer-vs-just-idempotent-producer-java-exception-outoforderseq 但是里面涉及到的参数我都没有设置,都是使用默认的配置。照理来说应该不会有这样的问题。想请问下各位有没有什么看法。还是我的配置有啥错误和缺少的地方。
Flink使用精准一次写入kafka报错
项目中使用精准一次语义写入kafka,代码和配置如下: Properties producerProperties = MyKafkaUtil.getProducerProperties(); KafkaSink kafkaSink = KafkaSink.builder() .setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server")) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(Event2Kafka.parameterTool.get("feature.topic.name")) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setKafkaProducerConfig(producerProperties) .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("streamx_flow_1261") .build(); eventJsonStream.sinkTo(kafkaSink).setParallelism(14) .name("event2kafka").uid("kafkasink"); public static Properties getProducerProperties(){ Properties kafkaProducerProps = new Properties(); kafkaProducerProps.setProperty("bootstrap.servers", parameterTool.get(bootstrap.server")); kafkaProducerProps.setProperty("auto.commit.interval.ms", "5000"); kafkaProducerProps.setProperty("auto.offset.reset", "latest"); kafkaProducerProps.setProperty("session.timeout.ms", "5000"); kafkaProducerProps.setProperty("transaction.timeout.ms",12*6 +""); kafkaProducerProps.put("security.protocol", "SASL_PLAINTEXT"); kafkaProducerProps.put("sasl.kerberos.service.name","kafka"); return kafkaProducerProps; }
Re:回复: flink 窗口触发计算的条件
你好,可以先看看官方文档中关于事件时间和水印的介绍 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/time/ 如果你发了多条数据,但是都是同样数据的话,水印没有推进,窗口就不会触发 在 2023-05-25 10:00:36,"小昌同学" 写道: >是的 我发送了很多数据,发现窗口还是没有触发 > > >| | >小昌同学 >| >| >ccc0606fight...@163.com >| > 回复的原邮件 >| 发件人 | yidan zhao | >| 发送日期 | 2023年5月25日 09:59 | >| 收件人 | | >| 主题 | Re: flink 窗口触发计算的条件 | >如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。 > >小昌同学 于2023年5月25日周四 09:32写道: > >各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题; >我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒, >但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在: >相关代码以及样例数据如下: >| >package job; >import bean.MidInfo3; >import bean.Result; >import bean2.BaseInfo2; >import com.alibaba.fastjson.JSON; >import com.alibaba.fastjson.JSONObject; >import config.FlinkConfig; >import org.apache.flink.api.common.eventtime.WatermarkStrategy; >import org.apache.flink.api.common.functions.FilterFunction; >import org.apache.flink.api.common.functions.JoinFunction; >import org.apache.flink.api.common.functions.MapFunction; >import org.apache.flink.api.common.serialization.SimpleStringSchema; >import org.apache.flink.api.common.state.StateTtlConfig; >import org.apache.flink.api.common.state.ValueState; >import org.apache.flink.api.common.state.ValueStateDescriptor; >import org.apache.flink.api.java.functions.KeySelector; >import org.apache.flink.configuration.Configuration; >import org.apache.flink.streaming.api.datastream.ConnectedStreams; >import org.apache.flink.streaming.api.datastream.DataStream; >import org.apache.flink.streaming.api.datastream.DataStreamSource; >import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.streaming.api.functions.ProcessFunction; >import org.apache.flink.streaming.api.functions.co.CoMapFunction; >import >org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; >import >org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; >import >org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; >import org.apache.flink.streaming.api.windowing.time.Time; >import >org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; >import org.apache.flink.streaming.api.windowing.windows.TimeWindow; >import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >import org.apache.flink.util.Collector; >import utils.DateUtil; >import utils.JdbcUtil; > >import java.sql.Connection; >import java.sql.PreparedStatement; >import java.sql.ResultSet; >import java.time.Duration; >import java.util.HashMap; >import java.util.Properties; > >public class RytLogAnly9 { >public static void main(String[] args) throws Exception { >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); >env.disableOperatorChaining(); >//1、消费Kafka中的数据 >String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers"); >String topicName = FlinkConfig.config.getProperty("dev_topicName"); >String groupId = FlinkConfig.config.getProperty("dev_groupId"); >String devMode = FlinkConfig.config.getProperty("dev_mode"); >Properties prop = new Properties(); >prop.setProperty("bootstrap.servers", servers); >prop.setProperty("group.id", groupId); >prop.setProperty("auto.offset.reset", devMode); >DataStreamSource sourceStream = env.addSource(new >FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop)); >sourceStream.print("最源端的数据sourceStream"); > >//2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据 >SingleOutputStreamOperator baseInfoStream = sourceStream.map(new >MapFunction() { >@Override >public BaseInfo2 map(String value) throws Exception { >JSONObject jsonObject = JSON.parseObject(value); >//获取到不同的服务器IP >String serverIp = jsonObject.getString("ip"); >//获取到不同的data的数据 >String datas = jsonObject.getString("data"); > >String[] splits = datas.split("\n"); >HashMap dataMap = new HashMap<>(); >//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间 >String time = splits[0].substring(7, 19).replace("-", "").trim(); >//将subData填充到自定义类型中,用来判断时请求还是应答 >String subData = datas.substring(0, 10); >for (int i = 0; i < splits.length; i++) { >if (splits[i].contains("=")) { >splits[i] = splits[i].replaceFirst("=", "&"); >String[] temp = splits[i].split("&"); >if (temp.length > 1) { >dataMap.put(temp[0].toLowerCase(), temp[1]); >} >} >} >return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, >DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, >System.currentTimeMillis()); >} >}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, > recordTimestamp) -> element.getEvenTime())); >baseInfoStream.print("不加功能描述的 baseInfoStream"); > >//3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述 >SingleOutputStreamOperator completeInfoStream = >baseInfoStream.map(new
Re:回复:报错显示为bug
link.api.common.InvalidProgramException: Table program cannot be >compiled. This is a bug. Please file an issue. > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74) > ... 13 more >Caused by: org.apache.flink.api.common.InvalidProgramException: Table program >cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89) > at > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) > ... 16 more >Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column >103: Cannot cast "java.lang.String" to "java.time.LocalDateTime" > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051) > at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396) > at org.codehaus.janino.Java$Cast.accept(Java.java:4898) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057) > at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409) > at > org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400) > at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924) > at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400) > at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396) > at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182) > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182) > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783) > at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762) > at > org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) > at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86) > ... 22 more > >Process finished with exit code 1 > >| > > >| | >小昌同学 >| >| >ccc0606fight...@163.com >| > 回复的原邮件 >| 发件人 | lxk | >| 发送日期 | 2023年5月15日 18:21 | >| 收件人 | | >| 主题 | Re:报错显示为bug | >你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。 > > > > > > > > > > > > > > > > > >在 2023-05-15 17:11:42,"小昌同学" 写道: >各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: >org.apache.flink.api.common.InvalidProgramException: Table program cannot be >compiled. This is a bug. Please file an issue. “ >flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作 > > >| | >小昌同学 >| >| >ccc0606fight...@163.com >|
Re:报错显示为bug
你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。 在 2023-05-15 17:11:42,"小昌同学" 写道: >各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: >org.apache.flink.api.common.InvalidProgramException: Table program cannot be >compiled. This is a bug. Please file an issue. “ >flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作 > > >| | >小昌同学 >| >| >ccc0606fight...@163.com >|
Re:Re: Re: Flink广播流状态清理策略不生效
好的,感谢 在 2023-05-15 15:49:12,"Hangxiang Yu" 写道: >Hi, 可以参考这个 Ticket ,就是讨论要给 Broadcast State 加 TTL 的,当时应该没有继续深入讨论: >https://issues.apache.org/jira/browse/FLINK-13721 >方便的话你可以在 Ticket 下面也分享下你的使用场景、观察到的现象吗?也可以在 Ticket 下 Vote for this issue. >我这边也会帮忙一起看下 > >On Mon, May 15, 2023 at 1:41 PM lxk wrote: > >> 这么看来,广播流好像不适合在生产中使用,状态会无限止的增长。这块官方有计划增加ttl功能吗。 >> 或者使用广播流的时候有没有什么能够手动清理状态的方法? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2023-05-15 11:28:54,"Hangxiang Yu" 写道: >> >Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里 >> >< >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl >> >对 >> >State TTL 的描述; >> > >> >On Mon, May 15, 2023 at 11:05 AM lxk wrote: >> > >> >> flink版本:1.14 >> >> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。 >> >> 在主程序中,我设置了状态过期策略: >> >>SingleOutputStreamOperator baiduStream = >> >> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data, >> >> AdvertiseClick.class)).name("BaiDuAdClick"); >> >> MapStateDescriptor baiduInfoMap = new >> >> MapStateDescriptor<>("advertiseInfo", String.class, >> AdvertiseClick.class); >> >> StateTtlConfig ttlConfig = StateTtlConfig >> >> .newBuilder(Time.days(7)) >> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >> >> .cleanupFullSnapshot() >> >> .cleanupIncrementally(200, true) >> >> .build(); >> >> baiduInfoMap.enableTimeToLive(ttlConfig); >> >> 在BroadcastProcessFunction中,我也设置了状态清除策略: >> >> public void open(Configuration parameters) throws Exception { >> >> jedisClusterSink = Ad_MyRedisUtil.getJedisClient(); >> >> baiduInfoDesc = new MapStateDescriptor> >> AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class); >> >> StateTtlConfig ttlConfig = StateTtlConfig >> >> .newBuilder(Time.days(7)) >> >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >> >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >> >> .cleanupFullSnapshot() >> >> .cleanupIncrementally(200, true) >> >> .build(); >> >> baiduInfoDesc.enableTimeToLive(ttlConfig); >> >> >> >> } >> >> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。 >> >> >> >> >> >> https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg >> >> >> >> >> >> >> >> >> >> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。 >> > >> > >> > >> >-- >> >Best, >> >Hangxiang. >> > > >-- >Best, >Hangxiang.
Re:Re: Flink广播流状态清理策略不生效
这么看来,广播流好像不适合在生产中使用,状态会无限止的增长。这块官方有计划增加ttl功能吗。 或者使用广播流的时候有没有什么能够手动清理状态的方法? 在 2023-05-15 11:28:54,"Hangxiang Yu" 写道: >Hi, 目前像 Broadcast state 这种 Operator State 应该是不支持 TTL 设置的,可以参考这里 ><https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl>对 >State TTL 的描述; > >On Mon, May 15, 2023 at 11:05 AM lxk wrote: > >> flink版本:1.14 >> 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。 >> 在主程序中,我设置了状态过期策略: >>SingleOutputStreamOperator baiduStream = >> env.addSource(adBaiduClick).map(data -> JSON.parseObject(data, >> AdvertiseClick.class)).name("BaiDuAdClick"); >> MapStateDescriptor baiduInfoMap = new >> MapStateDescriptor<>("advertiseInfo", String.class, AdvertiseClick.class); >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.days(7)) >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >> .cleanupFullSnapshot() >> .cleanupIncrementally(200, true) >> .build(); >> baiduInfoMap.enableTimeToLive(ttlConfig); >> 在BroadcastProcessFunction中,我也设置了状态清除策略: >> public void open(Configuration parameters) throws Exception { >> jedisClusterSink = Ad_MyRedisUtil.getJedisClient(); >> baiduInfoDesc = new MapStateDescriptor> AdvertiseClick>("advertiseInfo", String.class, AdvertiseClick.class); >> StateTtlConfig ttlConfig = StateTtlConfig >> .newBuilder(Time.days(7)) >> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >> .cleanupFullSnapshot() >> .cleanupIncrementally(200, true) >> .build(); >> baiduInfoDesc.enableTimeToLive(ttlConfig); >> >> } >> 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。 >> >> >> https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg >> >> >> >> >> 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。 > > > >-- >Best, >Hangxiang.
Flink广播流状态清理策略不生效
flink版本:1.14 目前使用的是对一个数据量比较小的流进行广播,另外的主流跟这个广播流进行匹配处理。 在主程序中,我设置了状态过期策略: SingleOutputStreamOperator baiduStream = env.addSource(adBaiduClick).map(data -> JSON.parseObject(data, AdvertiseClick.class)).name("BaiDuAdClick"); MapStateDescriptor baiduInfoMap = new MapStateDescriptor<>("advertiseInfo", String.class, AdvertiseClick.class); StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(7)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .cleanupFullSnapshot() .cleanupIncrementally(200, true) .build(); baiduInfoMap.enableTimeToLive(ttlConfig); 在BroadcastProcessFunction中,我也设置了状态清除策略: public void open(Configuration parameters) throws Exception { jedisClusterSink = Ad_MyRedisUtil.getJedisClient(); baiduInfoDesc = new MapStateDescriptor("advertiseInfo", String.class, AdvertiseClick.class); StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.days(7)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .cleanupFullSnapshot() .cleanupIncrementally(200, true) .build(); baiduInfoDesc.enableTimeToLive(ttlConfig); } 但是,从目前的checkpoint大小来看,状态清理策略似乎没有生效,程序已经运行了14天,但是整体的checkpoint还是一直在增长。 https://pic2.imgdb.cn/item/64619fef0d2dde54d4c6.jpg 我是用其他状态的过期策略都是生效的,不知道为啥在广播流这里看起来没生效,还是我的使用方式有问题。希望大家能帮忙看看。
Re:Re: Re: Flink程序内存Dump不了
非常感谢,切换账户之后,有些job是能dump下来,但是有些job还是会报相同的错误,并且已经确认过flink作业的user和运行linux命令的user是一致的。 在 2023-02-21 11:26:07,"Biao Geng" 写道: >Hi, >这个报错 sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410 >可能和flink关系不大。 >我们之前在生产环境中dump内存的时候遇到过类似问题,后续定位发现是运行jmap命令的Linux user与运行flink作业的Linux >user不一致导致的。 >不知道和你们的问题一不一致,你可以试试t用op -p 查到USER,然后su 一下,再进行jmap >-dump:format=b,file=/tmp/dump.hprof 试试。 > >Best, >Biao Geng > >Weihua Hu 于2023年2月20日周一 14:03写道: > >> Hi, >> >> 可以把心跳超时(heartbeat.timeout)[1]也调大再尝试 dump 内存。 >> >> >> [1] >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-fault-tolerance-options >> >> Best, >> Weihua >> >> >> On Mon, Feb 20, 2023 at 1:58 PM lxk wrote: >> >> > 我尝试调整了参数,具体数值如下 >> > >> > >> > akka.ask.timeout: 900s >> > >> > >> > >> > 但还是报同样的错 >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > 在 2023-02-17 17:32:51,"Guo Thompson" 写道: >> > >可能是jm 和 tm之间的心跳时间太短了, dump的过程会stop the world,tm就不响应jm的heartbeat了; >> > > >> > >lxk 于2023年2月14日周二 14:32写道: >> > > >> > >> Flink version:1.16 >> > >> java version: jdk1.8.0_251 >> > >> 问题:最近上线的Flink程序,频繁young >> > >> >> > >> gc,几秒一次,在调整了新生代大小之后,还是没有解决,目前整个jvm堆大小是3.57g。因此想通过程序内存情况来分析哪里问题有问题,我们通过yarn上的applicationId,使用ps >> > >> -ef|grep 1666758697316_2639108,找到对应的pid,最后执行 jmap -dump:format >> > >> b,file=user.dump 26326 >> > >> >> > >> 命令生成dump文件,但我们测试了很多个程序,只要一开始dump,都会对线上程序产生影响,程序的container会莫名的死掉,然后程序重启。具体执行命令后的报错如下: >> > >> sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410 >> > >> https://pic.imgdb.cn/item/63eb2a46f144a010071899ba.png >> > >> 不知道大家有没有遇见这个问题,是我们使用的姿势不对,还是目前使用的版本有什么问题,希望大家能够给出一些建议和看法。 >> > >>
Re:Re: Flink程序内存Dump不了
我尝试调整了参数,具体数值如下 akka.ask.timeout: 900s 但还是报同样的错 在 2023-02-17 17:32:51,"Guo Thompson" 写道: >可能是jm 和 tm之间的心跳时间太短了, dump的过程会stop the world,tm就不响应jm的heartbeat了; > >lxk 于2023年2月14日周二 14:32写道: > >> Flink version:1.16 >> java version: jdk1.8.0_251 >> 问题:最近上线的Flink程序,频繁young >> gc,几秒一次,在调整了新生代大小之后,还是没有解决,目前整个jvm堆大小是3.57g。因此想通过程序内存情况来分析哪里问题有问题,我们通过yarn上的applicationId,使用ps >> -ef|grep 1666758697316_2639108,找到对应的pid,最后执行 jmap -dump:format >> b,file=user.dump 26326 >> 命令生成dump文件,但我们测试了很多个程序,只要一开始dump,都会对线上程序产生影响,程序的container会莫名的死掉,然后程序重启。具体执行命令后的报错如下: >> sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410 >> https://pic.imgdb.cn/item/63eb2a46f144a010071899ba.png >> 不知道大家有没有遇见这个问题,是我们使用的姿势不对,还是目前使用的版本有什么问题,希望大家能够给出一些建议和看法。
Re:Re: Flink1.16写入kafka 报错:Cluster authorization failed.
我们改了权限确实解决了这个问题。但我现在想了解的是为什么Flink在1.16的时候需要创建producerID的权限,以及这个权限是不是针对新老Kafka API都需要的。针对新的Kafka API在精准一次的时候需要管理ProducerID在源码中有体现,但是老的API没看见相关的,只使用了一个ProducerID也需要由Flink内部自己管理吗? 在 2023-02-20 08:45:18,"Shammon FY" 写道: >Hi > >从`Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: >Cluster authorization failed.`这个错误看起来像是权限错误,可以你检查下是否有权限问题 > >Best, >Shammon > >On Fri, Feb 17, 2023 at 6:29 PM lxk wrote: > >> Flink版本:1.16 >> 目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错: >> 2023-02-17 15:03:19 >> org.apache.kafka.common.KafkaException: Cannot execute transactional >> method because we are in an error state >> at >> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125) >> at >> org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442) >> at >> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998) >> at >> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912) >> at >> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:197) >> at >> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) >> at org.apache.flink.streaming.runtime.io >> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> at org.apache.flink.streaming.runtime.io >> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> at org.apache.flink.streaming.runtime.io >> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) >> at >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) >> at >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: >> Cluster authorization failed. >> >> >> 在了解了相关源码之后,知道KafkaSink这种新的kafka >> api在实现精准一次的时候,分为了两个阶段,一个是writer,一个是commiter,其中在writer中维护了一个producerpool,因此需要权限创建producer,这块能理解。 >> 但是在使用老的kafka >> api,即FlinkKafkaProducer时,只需要维护一个Producer。不明白为啥在使用老的api的时候还是会报同样的错误。 >> >> >> 或者我说的原因不是这个报错的根本原因,希望大家能帮忙解答下
Flink1.16写入kafka 报错:Cluster authorization failed.
Flink版本:1.16 目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错: 2023-02-17 15:03:19 org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912) at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:197) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed. 在了解了相关源码之后,知道KafkaSink这种新的kafka api在实现精准一次的时候,分为了两个阶段,一个是writer,一个是commiter,其中在writer中维护了一个producerpool,因此需要权限创建producer,这块能理解。 但是在使用老的kafka api,即FlinkKafkaProducer时,只需要维护一个Producer。不明白为啥在使用老的api的时候还是会报同样的错误。 或者我说的原因不是这个报错的根本原因,希望大家能帮忙解答下
Re:Flink on yarn 运行一段时间出现 TaskManager with id is no longer reachable
你好,可以dump下内存分析 在 2023-02-16 10:05:19,"Fei Han" 写道: >@all >大家好!我的Flink 版本是1.14.5。CDC版本是2.2.1。在on yarn 运行一段时间后会出现如下报错: >org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id >container_e506_1673750933366_49579_01_02(hdp-server-010.yigongpin.com:8041) > is no longer reachable. at >org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1359) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] at >org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.reportHeartbeatRpcFailure(HeartbeatMonitorImpl.java:123) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] at >org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] at >org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] at >org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] at >org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] at >java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > ~[?:1.8.0_181] at >java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > ~[?:1.8.0_181] at >java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > ~[?:1.8.0_181] at >org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455) > ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at >org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at >org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455) > ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at >org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) > ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at >org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) > ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at >org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) > ~[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at >akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) >[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at >akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) >[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at >scala.PartialFunction.applyOrElse(PartialFunction.scala:123) >[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] at >scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) >[flink-rpc-akka_dec09d13-99a1-420c-b835-8157413a3db0.jar:1.14.5] >在以上报错后,还会出现如下checkpoint报错:org.apache.flink.runtime.checkpoint.CheckpointException: > Checkpoint expired before completing. at >org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000) > [flink-dist_2.12-1.14.5.jar:1.14.5] at >java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >[?:1.8.0_181] at java.util.concurrent.FutureTask.run(FutureTask.java:266) >[?:1.8.0_181] at >java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_181] at >java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [?:1.8.0_181] at >java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_181] at >java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_181] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]。 >请教下大佬们!这2个地方还怎么优化呢?有什么好的方法没有。
Flink程序内存Dump不了
Flink version:1.16 java version: jdk1.8.0_251 问题:最近上线的Flink程序,频繁young gc,几秒一次,在调整了新生代大小之后,还是没有解决,目前整个jvm堆大小是3.57g。因此想通过程序内存情况来分析哪里问题有问题,我们通过yarn上的applicationId,使用ps -ef|grep 1666758697316_2639108,找到对应的pid,最后执行 jmap -dump:format b,file=user.dump 26326 命令生成dump文件,但我们测试了很多个程序,只要一开始dump,都会对线上程序产生影响,程序的container会莫名的死掉,然后程序重启。具体执行命令后的报错如下: sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410 https://pic.imgdb.cn/item/63eb2a46f144a010071899ba.png 不知道大家有没有遇见这个问题,是我们使用的姿势不对,还是目前使用的版本有什么问题,希望大家能够给出一些建议和看法。
Re:Re: Flink SQL 如何优化以及处理反压
现在从web ui上看,瓶颈主要在于group by 聚合函数之后去重这个逻辑。 而且SQL这个并行度是全局设置的,没法针对某一个特定的算子设置并行度,并行度多了之后,资源又感觉有点吃紧。 在 2023-01-31 17:45:15,"weijie guo" 写道: >最好先找到导致下游处理过慢的瓶颈算子,适当扩大一下并发。如果还不行,看下jstack的情况,可能需要调整逻辑。 > >Best regards, > >Weijie > > >ssmq <374060...@qq.com.invalid> 于2023年1月31日周二 17:22写道: > >> 你可以测试不写入clickhouse是否还存在反压,如果不是因为写入瓶颈的话就从你的处理逻辑优化了 >> >> >> 发件人: lxk >> 发送时间: 2023年1月31日 15:16 >> 收件人: user-zh@flink.apache.org >> 主题: Flink SQL 如何优化以及处理反压 >> >> Flink版本:1.16.0 >> 目前在使用Flink SQL进行多流关联,并写入Clickhouse中 >> 具体代码如下: >> select \ >> header.id as id, \ >> LAST_VALUE(header.order_status), \ >> LAST_VALUE(header.customer_id), \ >> LAST_VALUE(header.shop_id), \ >> LAST_VALUE(header.parent_order_id), \ >> LAST_VALUE(header.order_at), \ >> LAST_VALUE(header.pay_at), \ >> LAST_VALUE(header.channel_id), \ >> LAST_VALUE(header.root_order_id), \ >> LAST_VALUE(header.last_updated_at), \ >> item.id as item_id, \ >> LAST_VALUE(item.order_id) as order_id, \ >> LAST_VALUE(item.row_num), \ >> LAST_VALUE(item.goods_id), \ >> LAST_VALUE(item.s_sku_code), \ >> LAST_VALUE(item.qty), \ >> LAST_VALUE(item.p_paid_sub_amt), \ >> LAST_VALUE(item.p_sp_sub_amt), \ >> LAST_VALUE(item.bom_type), \ >> LAST_VALUE(item.last_updated_at) as item_last_updated_at, \ >> LAST_VALUE(item.display_qty), \ >> LAST_VALUE(delivery.del_type), \ >> LAST_VALUE(delivery.time_slot_type), \ >> LAST_VALUE(delivery.time_slot_date), \ >> LAST_VALUE(delivery.time_slot_time_from), \ >> LAST_VALUE(delivery.time_slot_time_to), \ >> LAST_VALUE(delivery.sku_delivery_type), \ >> LAST_VALUE(delivery.last_updated_at) as del_last_updated_at, \ >> LAST_VALUE(promotion.id) as promo_id, \ >> LAST_VALUE(promotion.order_item_id), \ >> LAST_VALUE(promotion.p_promo_amt), \ >> LAST_VALUE(promotion.promotion_category), \ >> LAST_VALUE(promotion.promo_type), \ >> LAST_VALUE(promotion.promo_sub_type), \ >> LAST_VALUE(promotion.last_updated_at) as promo_last_updated_at, \ >> LAST_VALUE(promotion.promotion_cost) \ >> from \ >> item \ >> join \ >> header \ >> on item.order_id = header.id \ >> left join \ >> delivery \ >> on item.order_id = delivery.order_id \ >> left join \ >> promotion \ >> on item.id =promotion.order_item_id \ >> group by header.id,item.id >> 在Flink WEB UI 上发现程序反压很严重,而且时不时挂掉: >> https://pic.imgdb.cn/item/63d8bebbface21e9ef3c92fe.jpg >> >> 参考了京东的一篇文章 >> https://flink-learning.org.cn/article/detail/1e86b8b38faaeefd5ed7f70858aa40bc >> ,对相关参数做了调整,但是发现有些功能在Flink 1.16中已经做了相关优化了,同时加了这些参数之后对程序没有起到任何优化的作用。 >> >> conf.setString("table.exec.mini-batch.enabled", "true"); >> conf.setString("table.exec.mini-batch.allow-latency", "15 s"); >> conf.setString("table.exec.mini-batch.size", "5000"); >> conf.setString("table.exec.state.ttl", "86400 s"); >> conf.setString("table.exec.disabled-operators", "NestedLoopJoin"); >> conf.setString("table.optimizer.join.broadcast-threshold", "-1"); >> conf.setString("table.optimizer.multiple-input-enabled", "true"); >> conf.setString("table.exec.shuffle-mode", "POINTWISE_EDGES_PIPELINED"); >> conf.setString("taskmanager.network.sort-shuffle.min-parallelism", "8"); >> 想请教下,针对Flink SQL如何处理反压,同时有什么其他的优化手段? >> >> >> >>
Flink SQL 如何优化以及处理反压
Flink版本:1.16.0 目前在使用Flink SQL进行多流关联,并写入Clickhouse中 具体代码如下: select \ header.id as id, \ LAST_VALUE(header.order_status), \ LAST_VALUE(header.customer_id), \ LAST_VALUE(header.shop_id), \ LAST_VALUE(header.parent_order_id), \ LAST_VALUE(header.order_at), \ LAST_VALUE(header.pay_at), \ LAST_VALUE(header.channel_id), \ LAST_VALUE(header.root_order_id), \ LAST_VALUE(header.last_updated_at), \ item.id as item_id, \ LAST_VALUE(item.order_id) as order_id, \ LAST_VALUE(item.row_num), \ LAST_VALUE(item.goods_id), \ LAST_VALUE(item.s_sku_code), \ LAST_VALUE(item.qty), \ LAST_VALUE(item.p_paid_sub_amt), \ LAST_VALUE(item.p_sp_sub_amt), \ LAST_VALUE(item.bom_type), \ LAST_VALUE(item.last_updated_at) as item_last_updated_at, \ LAST_VALUE(item.display_qty), \ LAST_VALUE(delivery.del_type), \ LAST_VALUE(delivery.time_slot_type), \ LAST_VALUE(delivery.time_slot_date), \ LAST_VALUE(delivery.time_slot_time_from), \ LAST_VALUE(delivery.time_slot_time_to), \ LAST_VALUE(delivery.sku_delivery_type), \ LAST_VALUE(delivery.last_updated_at) as del_last_updated_at, \ LAST_VALUE(promotion.id) as promo_id, \ LAST_VALUE(promotion.order_item_id), \ LAST_VALUE(promotion.p_promo_amt), \ LAST_VALUE(promotion.promotion_category), \ LAST_VALUE(promotion.promo_type), \ LAST_VALUE(promotion.promo_sub_type), \ LAST_VALUE(promotion.last_updated_at) as promo_last_updated_at, \ LAST_VALUE(promotion.promotion_cost) \ from \ item \ join \ header \ on item.order_id = header.id \ left join \ delivery \ on item.order_id = delivery.order_id \ left join \ promotion \ on item.id =promotion.order_item_id \ group by header.id,item.id 在Flink WEB UI 上发现程序反压很严重,而且时不时挂掉: https://pic.imgdb.cn/item/63d8bebbface21e9ef3c92fe.jpg 参考了京东的一篇文章https://flink-learning.org.cn/article/detail/1e86b8b38faaeefd5ed7f70858aa40bc ,对相关参数做了调整,但是发现有些功能在Flink 1.16中已经做了相关优化了,同时加了这些参数之后对程序没有起到任何优化的作用。 conf.setString("table.exec.mini-batch.enabled", "true"); conf.setString("table.exec.mini-batch.allow-latency", "15 s"); conf.setString("table.exec.mini-batch.size", "5000"); conf.setString("table.exec.state.ttl", "86400 s"); conf.setString("table.exec.disabled-operators", "NestedLoopJoin"); conf.setString("table.optimizer.join.broadcast-threshold", "-1"); conf.setString("table.optimizer.multiple-input-enabled", "true"); conf.setString("table.exec.shuffle-mode", "POINTWISE_EDGES_PIPELINED"); conf.setString("taskmanager.network.sort-shuffle.min-parallelism", "8"); 想请教下,针对Flink SQL如何处理反压,同时有什么其他的优化手段?
Re:Re: 如何在flink中正确使用外部数据库连接
谢谢 我现在使用的是直连的方式,也没有关闭preparedstatement和resultset,但是没有发生过内存泄漏的问题,请问了解这背后的原因吗 在 2022-07-25 13:53:42,"Lijie Wang" 写道: >Hi, >根据我的经验,使用连接池时,至少需要及时关掉 statement/ResultSet,否则查询的结果会一直缓存,会有内存泄漏的问题。 > >Best, >Lijie > >lxk7...@163.com 于2022年7月23日周六 15:34写道: > >> >> 目前的项目中,需要使用外部数据库进行实时的look up。实时的主流数据量一天在百万级别,能用到的外部数据库有Mysql,Clickhouse >> 以及Redis缓存。 >> 现在是将数据实时落到Clickhouse,然后Flink实时的去look up >> clickhouse。(虽然知道Clickhouse并发性能不强,但目前能用的就只有这个了,需要存储千万级别的数据) >> 测试了两种方式: >> >> 1.使用JDBC连接池的方式去查询,Druid连接池以及C3P0连接池都用过,但是程序都是运行一段时间就会报OOM(有可能是使用方式不对)。通过dump日志排查的时候发现连接池的方式会将很多信息保留下来,所以最终没有使用这种方式。同时的话,在flink内部使用连接池的时候也没有显示的关闭连接。只在Close方法中调用了关闭。 >> 2.使用DriverManager获取连接查询。这种方式目前测试下来,程序是稳定运行的,也没有报OOM。同时也没有去关闭连接。 >> >> 问题:1.如何正确在flink内部使用外部数据库连接,使用数据池的方式,个人理解连接的管理都是由数据池来做的,所以不需要去显示close。同时的话,个人认为实时的程序去查,这个连接就会一直占用着,也无需关闭。简言之,无论是数据池还是直连,需不需要在invoke方法中关闭连接?还是只用在close方法中关闭连接。 >> 2.这种实时的look up除了缓存之外还有没有其他更好的优化手段?或者有什么其他的方案可以替代? >> >> >> lxk7...@163.com >>
Re:Re: Flink interval join 水印疑问
按照这个说法,那么timestamp和watermark其实没有关系。 但是我看到有关帖子里说:双流join里存储的mapstate。 而StreamRecord和watermark都是继承于streamelement,Flink会替换StreamRecord 对象中的Timestamp,如果 根据当前事件的Timestamp 生成的Watermark 大于上一次的Watermark,就发出新的Watermark。 具体代码在 TimestampsAndPunctuatedWatermarksOperator.processElement。 @OverridepublicvoidprocessElement(StreamRecord element)throws Exception { finalTvalue= element.getValue(); // 调用 用户实现的 extractTimestamp 获取新的TimestampfinallongnewTimestamp= userFunction.extractTimestamp(value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); // 用新Timestamp 替换StreamRecord中的旧Timestamp output.collect(element.replace(element.getValue(), newTimestamp)); // 调用 用户实现的 checkAndGetNextWatermark 方法获取下一个WatermarkfinalWatermarknextWatermark= userFunction.checkAndGetNextWatermark(value, newTimestamp); // 如果下一个Watermark 大于当前Watermark,就发出新的Watermarkif (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } } 以上是我看见的帖子中的相关内容 如果上述说法不对的话,那么在双流join中,watermark是怎么流转的? 在 2022-07-07 10:03:00,"yidan zhao" 写道: >timestamp是为每个element(输入的记录)赋值的一个时间戳。 >watermark是从source部分生成的水印个,然后向后传播。 > >以分窗口为例,数据记录的timestamp用于决定数据划分入哪个窗口。 >watermark用于决定窗口啥时候闭合,比如窗口是0-5s,那么当watermark达到5s的时候,窗口就会闭合。 > >考虑数据不一定能及时到达,可以让watermark=max(timestamp)-30s。30s即可容忍给的数据乱序的程度。 > >lxk 于2022年7月6日周三 13:36写道: >> >> 在使用interval join的时候有一些疑问,希望大家能够帮忙解答一下 >> https://pic.imgdb.cn/item/62c5015b5be16ec74ac2b23f.png >> 官方文档中说会从两个流中的 timestamp 中取最大值,看了下源码确实是这样 >> https://pic.imgdb.cn/item/62c51e8b5be16ec74ae22880.png >> 我的问题是: >> 1.这里的timestamp和watermark有什么区别? >> 2.interval >> join中watermark是怎么计算的?两个流取最大的timestamp之后,watermark跟这个最大的timestamp是否有某种联系
Flink interval join 水印疑问
在使用interval join的时候有一些疑问,希望大家能够帮忙解答一下 https://pic.imgdb.cn/item/62c5015b5be16ec74ac2b23f.png 官方文档中说会从两个流中的 timestamp 中取最大值,看了下源码确实是这样 https://pic.imgdb.cn/item/62c51e8b5be16ec74ae22880.png 我的问题是: 1.这里的timestamp和watermark有什么区别? 2.interval join中watermark是怎么计算的?两个流取最大的timestamp之后,watermark跟这个最大的timestamp是否有某种联系
Re:使用join+聚合时,checkpoint异常
你好,图片挂了,可以尝试使用图床工具上传图片。 在 2022-06-21 09:42:54,"amber_...@qq.com.INVALID" 写道: 您好! 我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务; 当我提交普通数据同步任务时,一切正常; 当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed Memory使用率始终是100%; 以下是我的checkpoint配置: 我尝试增加Task Managed内存,但使用率总是100%; 当我关闭增量检查点时,无任何变化; 当我将State Backend切换为hashmap时,Managed Memory使用率回归正常,但checkpoint仍然无法工作; 期待你的回复。 祝好! amber_...@qq.com
Re:Re: Re:Re: Re: Flink 使用interval join数据丢失疑问
我大致理解了,数据其实是在关联之前就丢掉了。之前了解的最多的是interval join,目前来看我这种场景其实使用inner join比较合适,这个水印确实感觉挺难很合理的去设置。 在 2022-06-15 12:06:56,"Zhiwen Sun" 写道: >我猜测是 watermark 的问题, 看楼主的设置, watermark 是 -2s ,也就是说, order header 流,有数据晚了 2s >,就会被丢弃。 > >楼主之前看的也是 订单明细比订单主表晚几秒, 这只是同一个订单的数据生成时间差异。 如果是这样的话,使用一般的 inner join + ttl >就可以满足需求了。 > >BTW: watermark 我觉得很难使用好,实际使用场景非常有限。 > > > >Zhiwen Sun > > > >On Wed, Jun 15, 2022 at 11:43 AM Shengkai Fang wrote: > >> > 我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval join开分钟级别的数据还要准确 >> >> 不合理的 watermark 设置在 interval join 就会导致丢数据。设置 ttl 情况下,如果某个 key >> 的数据频繁访问情况下,那么这个数据就不会过期。 >> >> > 我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志。 >> >> 我记得日志是会打印相关的日志。能提一些相关的日志吗? >> >> best, >> Shengkai >> >> lxk 于2022年6月14日周二 20:04写道: >> >> > Hi, >> > 我目前使用sql interval join,窗口的上下界增加到分钟级别,分别是-2 minute 和 +4 minute >> > 目前来看数据量和使用inner join要差不多了。以下是代码 >> > Table headerTable = >> > streamTableEnvironment.fromDataStream(headerFilterStream, >> > Schema.newBuilder() >> > .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS >> > TIMESTAMP_LTZ(3))") >> > .watermark("rowtime", "rowtime + INTERVAL '2' SECOND") >> > .build()); >> > Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream, >> > Schema.newBuilder() >> > .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS >> > TIMESTAMP_LTZ(3))") >> > .watermark("rowtime", "rowtime + INTERVAL '2' SECOND") >> > .build()); >> > >> > >> > streamTableEnvironment.createTemporaryView("header",headerTable); >> > streamTableEnvironment.createTemporaryView("item",itemTable); >> > Table result = streamTableEnvironment.sqlQuery("select >> header.customer_id" >> > + >> > ",item.goods_id" + >> > ",header.id" + >> > ",header.order_status" + >> > ",header.shop_id" + >> > ",header.parent_order_id" + >> > ",header.order_at" + >> > ",header.pay_at" + >> > ",header.channel_id" + >> > ",header.root_order_id" + >> > ",item.id" + >> > ",item.row_num" + >> > ",item.p_sp_sub_amt" + >> > ",item.display_qty" + >> > ",item.qty" + >> > ",item.bom_type" + >> > " from item JOIN header on header.id = item.order_id and item.rowtime >> > BETWEEN header.rowtime - INTERVAL '2' MINUTE AND header.rowtime + >> INTERVAL >> > '4' MINUTE"); >> > >> > 对此,我又有新的疑问了,我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval >> > join开分钟级别的数据还要准确?针对这个问题,不知道大家有什么看法和思路? >> > 我的一个猜测是我设置的表的ttl没有生效,inner join一直使用的是全量的数据,所以结果准确度要比interval >> > join高,我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志,我的配置如下: >> > Configuration conf = new Configuration(); >> > conf.setString("table.exec.mini-batch.enabled","true"); >> > conf.setString("table.exec.mini-batch.allow-latency","15 s"); >> > conf.setString("table.exec.mini-batch.size","100"); >> > conf.setString("table.exec.state.ttl","20 s"); >> > env.configure(conf); >> > StreamTableEnvironment streamTableEnvironment = >> > StreamTableEnvironment.create(env, >> > EnvironmentSettings.fromConfiguration(conf)); >> > >> > >> > 我想了解下,从tm和jm日志是否能正确反应我的配置生效?如果不行,那我要使用什么方法才能知道我的这个配置是否生效? >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > 在 2022-06-13 21:12:48,"Xuyang" 写道: >> > >Hi, >> > > 1、理论上来说inner join关联的数据量应该比interval >> > join更大吧。关于左右两边流速度不一致的情况,理论上应该问题不大,因为需要等到两边的watermark都到齐之后才会触发状态里过期数据的清除。 >> > > 2、inner >> > >> join没有水印的情况下,就是到了就发,完全根据这条数据进入这个算子的时间来算,也就是“处理时间”。默认数据是不会过期的,会存全量数据。如果定义了ttl,得看join两侧的表的pk和join >> > >> key,分别用不同的state数据格式来存数据(ListState、MapState等),原则上是多长时间没更新数据之后,就清空过期数据,是按照的“处理时间”来处理的。 >> > > >> > > >> > >如果我有不对的地方,请指正我哈。 >> > > >> > > >> > > >> > > >> > >-- >> > > >> >
Re:Re:Re: Re: Flink 使用interval join数据丢失疑问
Hi, 我目前使用sql interval join,窗口的上下界增加到分钟级别,分别是-2 minute 和 +4 minute 目前来看数据量和使用inner join要差不多了。以下是代码 Table headerTable = streamTableEnvironment.fromDataStream(headerFilterStream, Schema.newBuilder() .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))") .watermark("rowtime", "rowtime + INTERVAL '2' SECOND") .build()); Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream, Schema.newBuilder() .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))") .watermark("rowtime", "rowtime + INTERVAL '2' SECOND") .build()); streamTableEnvironment.createTemporaryView("header",headerTable); streamTableEnvironment.createTemporaryView("item",itemTable); Table result = streamTableEnvironment.sqlQuery("select header.customer_id" + ",item.goods_id" + ",header.id" + ",header.order_status" + ",header.shop_id" + ",header.parent_order_id" + ",header.order_at" + ",header.pay_at" + ",header.channel_id" + ",header.root_order_id" + ",item.id" + ",item.row_num" + ",item.p_sp_sub_amt" + ",item.display_qty" + ",item.qty" + ",item.bom_type" + " from item JOIN header on header.id = item.order_id and item.rowtime BETWEEN header.rowtime - INTERVAL '2' MINUTE AND header.rowtime + INTERVAL '4' MINUTE"); 对此,我又有新的疑问了,我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval join开分钟级别的数据还要准确?针对这个问题,不知道大家有什么看法和思路? 我的一个猜测是我设置的表的ttl没有生效,inner join一直使用的是全量的数据,所以结果准确度要比interval join高,我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志,我的配置如下: Configuration conf = new Configuration(); conf.setString("table.exec.mini-batch.enabled","true"); conf.setString("table.exec.mini-batch.allow-latency","15 s"); conf.setString("table.exec.mini-batch.size","100"); conf.setString("table.exec.state.ttl","20 s"); env.configure(conf); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, EnvironmentSettings.fromConfiguration(conf)); 我想了解下,从tm和jm日志是否能正确反应我的配置生效?如果不行,那我要使用什么方法才能知道我的这个配置是否生效? 在 2022-06-13 21:12:48,"Xuyang" 写道: >Hi, > 1、理论上来说inner join关联的数据量应该比interval > join更大吧。关于左右两边流速度不一致的情况,理论上应该问题不大,因为需要等到两边的watermark都到齐之后才会触发状态里过期数据的清除。 > 2、inner > join没有水印的情况下,就是到了就发,完全根据这条数据进入这个算子的时间来算,也就是“处理时间”。默认数据是不会过期的,会存全量数据。如果定义了ttl,得看join两侧的表的pk和join > > key,分别用不同的state数据格式来存数据(ListState、MapState等),原则上是多长时间没更新数据之后,就清空过期数据,是按照的“处理时间”来处理的。 > > >如果我有不对的地方,请指正我哈。 > > > > >-- > >Best! >Xuyang > > > > > >在 2022-06-12 14:39:39,"lxk7...@163.com" 写道: >>非常感谢回复 >>1.针对watermark,我会再去进行测试。同时还会测试使用处理时间,interval join会不会丢失数据 >>2.针对interval jon,我个人的理解是它能关联到的数据范围要比inner >>join大,所以数据应该更准确,但是从结果上看却是数据丢失,当时非常震惊,有点颠覆我的认知了。同时我自己还有一个新的猜测,就是两个流的数据量不一样,可能也会造成数据丢失。目前左流是订单粒度数据,右流是订单-商品粒度数据,数据量要大很多。我个人理解,在处理右流的时候,应该会慢一点,所以导致两边的时间进展可能不一致。但是这又引发了一个新的疑问?inner >> join应该也会受这样的影响 >>3.还有一个问题可能是我没有阐述清楚,我在sql里使用inner >>join,没有注册水印,那么两个流的join应该是以处理时间来定义的?那么表的state的过期是否也是以处理时间来定义? >> >> >> >>lxk7...@163.com >> >>发件人: Shengkai Fang >>发送时间: 2022-06-11 20:35 >>收件人: user-zh >>主题: Re: Re: Flink 使用interval join数据丢失疑问 >>hi, >> >>对于第一点,丢数据的情况有很多。首先,要确认是不是 JOIN 算子丢数据(SINK 的使用不当也会丢数据)。如果明确了是 join >>算子丢的数据,建议明确下丢的数据是咋样的,是不是 watermark 设置不合理,导致数据被误认为是晚到数据从而被丢了。例如,这里的是 `event >>time` = `rowtime` - 2s,是不是不合适,我咋记得一般都是 +2 s 呢? >> >>对于第二点,interval join 我个人初步的理解是 state 的清理是根据两边的 event time,也就是说,如果右流的 event >>time 的更新会影响左流的数据清理。比如说右流的时间点到了 12:00,join 条件要求左流的时间不会晚于右流的时间 1h,那么左流 >>11:00之前的数据都可以被清理了。 >> >>对于第三点,我觉得是不能的。目前的 inner join + state 清理无法覆盖 event time 的window join 的。 >> >>best, >>Shengkai >> >>lxk7...@163.com 于2022年6月10日周五 23:03写道: >> >>> 对于这个问题,我还是有很大的疑问,再把我这个场景描述一下: >>> >>> 目前是使用flink进行双流join,两个流的数据,一个流是订单主表,另一个流是订单明细表。我们探查了离线的数据,订单明细表一般会在订单主表生成后晚几秒内生成,这个差异在秒级别。 >>> 我们做了以下几轮测试,并对比了另一个实时落的表数据量。(这个表就是基准参照数据,只是简单落表,没做任何处理,两边的数据源一致,对比的口径一致。) >>> 1.使用datastream api,使用kafka自带的时间戳做水印,使用interval join。对比完结果,数据少。 >>> 2.使用流转表,sql inner join,没有设置watermark。对比完结果数据正常。 >>> 3.使用流转表,sql interval join,从数据中的事件时间提取水印,对比完结果数据,数据少。 >>> 从结果上看,我不太明白为什么sql里inner join能保证数据准确,而interval >>> join不行?有什么好的方式或者思路能让我更好的去尝试了解这个问题产生的原因 >>> >>> 针对第二种方式,我的疑问是,sql里没有设置水印,那么表的state过期是以处理时间来计算吗?针对这种设置了表state过期时间的join,我能理解为这个inner >>> join其实是一个window join吗? >>> >>> >>> >>> lxk7...@163.com >>> >>> 发件人: lxk &g
Re:Flink JobManager 节点 JVM Metaspace 过高
可以把堆栈的日志打印出来看看 在 2022-06-10 18:15:53,"Summer" 写道: 使用 FinkUI 上传 Flink 任务 Jar 时,任务启动失败。 这时候JVM Metaspace就会异常增加。 这是什么原因?
Re:Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问
owtime0, +(rowtime, 2:INTERVAL SECOND)))], select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, rowtime, id0, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty, rowtime0]) :- Exchange(distribution=[hash[id]]) : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 2000:INTERVAL SECOND)]) : +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime]) :+- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, last_updated_at, business_flag, mysql_op_type]) +- Exchange(distribution=[hash[order_id]]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 2000:INTERVAL SECOND)]) +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty, CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, display_qty, is_first_flag]) == Optimized Execution Plan == Calc(select=[customer_id, goods_id, id, order_status, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, p_sp_sub_amt, display_qty, qty, bom_type]) +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-2, leftUpperBound=1, leftTimeIndex=9, rightTimeIndex=8], where=[((id = order_id) AND (rowtime0 >= (rowtime - 1:INTERVAL SECOND)) AND (rowtime0 <= (rowtime + 2:INTERVAL SECOND)))], select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, rowtime, id0, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty, rowtime0]) :- Exchange(distribution=[hash[id]]) : +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 2000:INTERVAL SECOND)]) : +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime]) :+- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, last_updated_at, business_flag, mysql_op_type]) +- Exchange(distribution=[hash[order_id]]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 2000:INTERVAL SECOND)]) +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty, CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, display_qty, is_first_flag]) 在 2022-06-10 17:16:31,"lxk" 写道: >使用sql 进行interval >join,我目前的问题是感觉时间转换这块不太友好,我目前流里面的事件时间字段是string类型,数据样式是2022-06-10 >13:08:55,但是我使用TO_TIMESTAMP这个函数进行转换一直报错 > > > > > > > > > > > > > > > > > >在 2022-06-10 15:04:31,"Xuyang" 写道: >>Hi, datastream的这个interval join的api应该对标的是sql中的interval >>join。但是你目前写的这个sql,是普通join。普通join和interval >>join在业务含义和实现上都是有区别的。所以你直接拿datastream api的interval >>join和sql上的普通join结果对比,其实是有问题的。所以我之前的建议是让你试下让sql也使用interval join,这样双方才有可比性。 >> >> >>另外sql中设置的table.exec.state.ttl这个参数,只是代表的state会20s清空过期数据,但我看你要比较的时间窗口是-10s和20s,貌似也不大一样。 >> >> >> >> >>-- >> >>Best! >>Xuyang >> >> >> >> >> >>在 2022-06-10 14:33:37,"lxk" 写道: >>> >>> >>> >>>我不理解的点在于,我interval join开的时间窗口比我sql中设置的状态时间都要长,窗口的上下界别是-10s 和 20s,为什么会丢数据? >>> >>>sql中我设置这个table.exec.state.ttl参数 >>>为20s,照理来说两个流应该也是保留20s的数据在状态中进行join。不知道我的理解是否有问题,希望能够得到解答。 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>>在 2022-06-10 14:15:29,"Xuyang" 写道: >>>>Hi, 你的这条SQL 并不是interval join,是普通join。 >>>>interval join的使用文档可以参考文档[1]。可以试下使用SQL interval >>>>join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。 >>>> >>>> >>>> >>>> >>>>[1] >>>>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-
Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问
使用sql 进行interval join,我目前的问题是感觉时间转换这块不太友好,我目前流里面的事件时间字段是string类型,数据样式是2022-06-10 13:08:55,但是我使用TO_TIMESTAMP这个函数进行转换一直报错 在 2022-06-10 15:04:31,"Xuyang" 写道: >Hi, datastream的这个interval join的api应该对标的是sql中的interval >join。但是你目前写的这个sql,是普通join。普通join和interval join在业务含义和实现上都是有区别的。所以你直接拿datastream >api的interval join和sql上的普通join结果对比,其实是有问题的。所以我之前的建议是让你试下让sql也使用interval >join,这样双方才有可比性。 > > >另外sql中设置的table.exec.state.ttl这个参数,只是代表的state会20s清空过期数据,但我看你要比较的时间窗口是-10s和20s,貌似也不大一样。 > > > > >-- > >Best! >Xuyang > > > > > >在 2022-06-10 14:33:37,"lxk" 写道: >> >> >> >>我不理解的点在于,我interval join开的时间窗口比我sql中设置的状态时间都要长,窗口的上下界别是-10s 和 20s,为什么会丢数据? >> >>sql中我设置这个table.exec.state.ttl参数 >>为20s,照理来说两个流应该也是保留20s的数据在状态中进行join。不知道我的理解是否有问题,希望能够得到解答。 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2022-06-10 14:15:29,"Xuyang" 写道: >>>Hi, 你的这条SQL 并不是interval join,是普通join。 >>>interval join的使用文档可以参考文档[1]。可以试下使用SQL interval >>>join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。 >>> >>> >>> >>> >>>[1] >>>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins >>> >>> >>> >>> >>> >>> >>> >>>-- >>> >>>Best! >>>Xuyang >>> >>> >>> >>> >>> >>>在 2022-06-10 11:26:33,"lxk" 写道: >>>>我用的是以下代码: >>>>String s = streamTableEnvironment.explainSql("select header.customer_id" + >>>>",item.goods_id" + >>>>",header.id" + >>>>",header.order_status" + >>>>",header.shop_id" + >>>>",header.parent_order_id" + >>>>",header.order_at" + >>>>",header.pay_at" + >>>>",header.channel_id" + >>>>",header.root_order_id" + >>>>",item.id" + >>>>",item.row_num" + >>>>",item.p_sp_sub_amt" + >>>>",item.display_qty" + >>>>",item.qty" + >>>>",item.bom_type" + >>>>" from header JOIN item on header.id = item.order_id"); >>>> >>>>System.out.println("explain:" + s); >>>> >>>> >>>> >>>> >>>>plan信息为: >>>>explain:== Abstract Syntax Tree == >>>>LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], >>>>order_status=[$1], shop_id=[$3], parent_order_id=[$4], order_at=[$5], >>>>pay_at=[$6], channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14], >>>>p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20]) >>>>+- LogicalJoin(condition=[=($0, $13)], joinType=[inner]) >>>> :- LogicalTableScan(table=[[default_catalog, default_database, >>>> Unregistered_DataStream_Source_5]]) >>>> +- LogicalTableScan(table=[[default_catalog, default_database, >>>> Unregistered_DataStream_Source_8]]) >>>> >>>> >>>>== Optimized Physical Plan == >>>>Calc(select=[customer_id, goods_id, id, order_status, shop_id, >>>>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, >>>>p_sp_sub_amt, display_qty, qty, bom_type]) >>>>+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, >>>>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, >>>>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, >>>>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], >>>>rightInputSpec=[NoUniqueKey]) >>>> :- Exchange(distribution=[hash[id]]) >>>> : +- Calc(select=[id, order_status, customer_id, shop_id, >>>> parent_order_id, order_at, pay_at, channel_id, root_order_id]) >>>> : +- TableSourceScan(table=[[default_catalog, default_database, >>>> Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, >>>> shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, >>>> last_updated_at, business_flag, mysql_op_type]) >>>> +- Exchange(distribution=[hash[order_id]]) >>>> +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, >>>> bom_type, display_qty]) >>>> +- Ta
Re:Re: Flink 使用interval join数据丢失疑问
我用的是以下代码: String s = streamTableEnvironment.explainSql("select header.customer_id" + ",item.goods_id" + ",header.id" + ",header.order_status" + ",header.shop_id" + ",header.parent_order_id" + ",header.order_at" + ",header.pay_at" + ",header.channel_id" + ",header.root_order_id" + ",item.id" + ",item.row_num" + ",item.p_sp_sub_amt" + ",item.display_qty" + ",item.qty" + ",item.bom_type" + " from header JOIN item on header.id = item.order_id"); System.out.println("explain:" + s); plan信息为: explain:== Abstract Syntax Tree == LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], order_status=[$1], shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14], p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20]) +- LogicalJoin(condition=[=($0, $13)], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_5]]) +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_8]]) == Optimized Physical Plan == Calc(select=[customer_id, goods_id, id, order_status, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, p_sp_sub_amt, display_qty, qty, bom_type]) +- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id]) : +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, last_updated_at, business_flag, mysql_op_type]) +- Exchange(distribution=[hash[order_id]]) +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty]) +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, display_qty, is_first_flag]) == Optimized Execution Plan == Calc(select=[customer_id, goods_id, id, order_status, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, p_sp_sub_amt, display_qty, qty, bom_type]) +- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id]) : +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, last_updated_at, business_flag, mysql_op_type]) +- Exchange(distribution=[hash[order_id]]) +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty]) +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, display_qty, is_first_flag]) 在 2022-06-10 11:02:56,"Shengkai Fang" 写道: >你好,能提供下具体的 plan 供大家查看下吗? > >你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN >").print() 打印下相关的信息。 > >Best, >Shengkai > >lxk 于2022年6月10日周五 10:29写道: > >> flink 版本:1.14.4 >> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval >> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 >> 水印是直接使用kafka 自带的时间戳生成watermark >> >> >> 以下是代码 ---interval join >> >> SingleOutputStreamOperator headerFullStream = >> headerFilterStream.keyBy(data -> data.getId()) >> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id())) >> .between(Time.seconds(-10), Time.seconds(20)) >> .process(new ProcessJoinFunction() { >> @Override >> public void processElement(OrderHeader left, OrderItem right, Context >> context, Collector collector) throws Exception { >> HeaderFull headerFull = new HeaderFull(); >> BeanUtilsBean beanU
Flink 使用interval join数据丢失疑问
flink 版本:1.14.4 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 水印是直接使用kafka 自带的时间戳生成watermark 以下是代码 ---interval join SingleOutputStreamOperator headerFullStream = headerFilterStream.keyBy(data -> data.getId()) .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id())) .between(Time.seconds(-10), Time.seconds(20)) .process(new ProcessJoinFunction() { @Override public void processElement(OrderHeader left, OrderItem right, Context context, Collector collector) throws Exception { HeaderFull headerFull = new HeaderFull(); BeanUtilsBean beanUtilsBean = new BeanUtilsBean(); beanUtilsBean.copyProperties(headerFull, left); beanUtilsBean.copyProperties(headerFull, right); String event_date = left.getOrder_at().substring(0, 10); headerFull.setEvent_date(event_date); headerFull.setItem_id(right.getId()); collector.collect(headerFull); } } 使用sql 进行join Configuration conf = new Configuration(); conf.setString("table.exec.mini-batch.enabled","true"); conf.setString("table.exec.mini-batch.allow-latency","15 s"); conf.setString("table.exec.mini-batch.size","100"); conf.setString("table.exec.state.ttl","20 s"); env.configure(conf); Table headerTable = streamTableEnvironment.fromDataStream(headerFilterStream); Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream); streamTableEnvironment.createTemporaryView("header",headerTable); streamTableEnvironment.createTemporaryView("item",itemTable); Table result = streamTableEnvironment.sqlQuery("select header.customer_id" + ",item.goods_id" + ",header.id" + ",header.order_status" + ",header.shop_id" + ",header.parent_order_id" + ",header.order_at" + ",header.pay_at" + ",header.channel_id" + ",header.root_order_id" + ",item.id" + ",item.row_num" + ",item.p_sp_sub_amt" + ",item.display_qty" + ",item.qty" + ",item.bom_type" + " from header JOIN item on header.id = item.order_id"); DataStream rowDataStream = streamTableEnvironment.toChangelogStream(result); 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval join,为啥两者最终关联上的结果差异这么大。
Re:Flink写入CK数据丢失问题
补充一下图片 https://s2.loli.net/2022/06/02/C5it4rPFgmJlopZ.png https://s2.loli.net/2022/06/02/3ri2HIv1RsAawBW.png https://s2.loli.net/2022/06/02/efVWPvXCYFwhgTp.png https://s2.loli.net/2022/06/02/9UptbNaWvs7xwXC.png 在 2022-06-02 11:38:24,"lxk" 写道: 各位,请教个问题 目前使用flink往ck写入数据,使用的是datastream api以及rocksdb状态后端,程序中了开了两个窗口,都是10秒级别。同时还使用了sql进行group by 求和,求和的操作没有加窗口,同时streamtableenv 设置了状态生存时间为10s. 在跟离线端对比数据的时候发现,一段时间内的数据跟离线差异不大,从0点-17点(数据的事件时间),但是18点(事件时间)以后的数据实时端差异量特别大。 目前在webui上发现整个管理内存的使用率已经占满,不知道是否跟这个有关系。 还有一点现象是,今天的数据我们对比了ck上实时的表(正确的),总体数据量还是要小很多。但是当我从零点重新消费,目前来看今天的数据能够对上,不知道是否是因为程序运行一段时间后,整个管理内存都被占满了,从而导致原本缓存的数据丢失了。 以下是相应的算子链和整个tm内存情况。出现反压是因为从今天0点重新开始消费了。
Flink写入CK数据丢失问题
各位,请教个问题 目前使用flink往ck写入数据,使用的是datastream api以及rocksdb状态后端,程序中了开了两个窗口,都是10秒级别。同时还使用了sql进行group by 求和,求和的操作没有加窗口,同时streamtableenv 设置了状态生存时间为10s. 在跟离线端对比数据的时候发现,一段时间内的数据跟离线差异不大,从0点-17点(数据的事件时间),但是18点(事件时间)以后的数据实时端差异量特别大。 目前在webui上发现整个管理内存的使用率已经占满,不知道是否跟这个有关系。 还有一点现象是,今天的数据我们对比了ck上实时的表(正确的),总体数据量还是要小很多。但是当我从零点重新消费,目前来看今天的数据能够对上,不知道是否是因为程序运行一段时间后,整个管理内存都被占满了,从而导致原本缓存的数据丢失了。 以下是相应的算子链和整个tm内存情况。出现反压是因为从今天0点重新开始消费了。