Re: FlinkSQL大窗口小步长的滑动窗口解决方案
Hi, 这是窗口触发后发送的数据量过大吗?调大资源,加大窗口计算的并发度是否可以缓解这个问题? Best, Shammon FY On Fri, May 26, 2023 at 2:03 PM tanjialiang wrote: > Hi, all. > 我在使用FlinkSQL的window tvf滑动窗口时遇到一些问题。 > 滑动步长为5分钟,窗口为24小时,group by > user_id的滑动窗口,当任务挂掉了或者从kafka的earliest-offset消费,checkpoint很难成功。 > 因为从earliest开始消费,数据很快就会堆满缓冲区产生背压,这时这一批数据可能会触发N次窗口计算往下游发,每次触发的操作成本是(用户基数 * > 24 * 60 / 5),checkpoint barrier可能会一直卡住。 > 这时候有什么办法可以破局吗? > > > best, > tanjialiang.
Flink使用精准一次写入kafka报错
项目中使用精准一次语义写入kafka,代码和配置如下: Properties producerProperties = MyKafkaUtil.getProducerProperties(); KafkaSink kafkaSink = KafkaSink.builder() .setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server")) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(Event2Kafka.parameterTool.get("feature.topic.name")) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setKafkaProducerConfig(producerProperties) .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("streamx_flow_1261") .build(); eventJsonStream.sinkTo(kafkaSink).setParallelism(14) .name("event2kafka").uid("kafkasink"); public static Properties getProducerProperties(){ Properties kafkaProducerProps = new Properties(); kafkaProducerProps.setProperty("bootstrap.servers", parameterTool.get(bootstrap.server")); kafkaProducerProps.setProperty("auto.commit.interval.ms", "5000"); kafkaProducerProps.setProperty("auto.offset.reset", "latest"); kafkaProducerProps.setProperty("session.timeout.ms", "5000"); kafkaProducerProps.setProperty("transaction.timeout.ms",12*6 +""); kafkaProducerProps.put("security.protocol", "SASL_PLAINTEXT"); kafkaProducerProps.put("sasl.kerberos.service.name","kafka"); return kafkaProducerProps; }
Flink使用精准一次写入kafka报错
上封邮件发错了,重新发一下。项目中使用精准一次语义写入kafka,代码和配置如下: 写入代码如下: Properties producerProperties = MyKafkaUtil.getProducerProperties(); KafkaSink kafkaSink = KafkaSink.builder() .setBootstrapServers(Event2Kafka.parameterTool.get("bootstrap.server")) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(Event2Kafka.parameterTool.get("feature.topic.name")) .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setKafkaProducerConfig(producerProperties) .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("streamx_flow_1261") .build(); eventJsonStream.sinkTo(kafkaSink).setParallelism(14) .name("event2kafka").uid("kafkasink"); kafka配置如下: public static Properties getProducerProperties(){ Properties kafkaProducerProps = new Properties(); kafkaProducerProps.setProperty("bootstrap.servers", parameterTool.get(bootstrap.server")); kafkaProducerProps.setProperty("auto.commit.interval.ms", "5000"); kafkaProducerProps.setProperty("auto.offset.reset", "latest"); kafkaProducerProps.setProperty("session.timeout.ms", "5000"); kafkaProducerProps.setProperty("transaction.timeout.ms",12*6 +""); kafkaProducerProps.put("security.protocol", "SASL_PLAINTEXT"); kafkaProducerProps.put("sasl.kerberos.service.name","kafka"); return kafkaProducerProps; } 项目运行很久都没啥问题,最近突然报了以下的错误 org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka -topic-2@-1 with FlinkKafkaInternalProducer{transactionalId='streamx_flow_1261-8-5', inTransaction=true, closed=false} at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:436) at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:417) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number. 参考了stackoverflow上面的回答:https://stackoverflow.com/questions/55192852/transactional-producer-vs-just-idempotent-producer-java-exception-outoforderseq 但是里面涉及到的参数我都没有设置,都是使用默认的配置。照理来说应该不会有这样的问题。想请问下各位有没有什么看法。还是我的配置有啥错误和缺少的地方。
回复: FlinkSQL大窗口小步长的滑动窗口解决方案
Hi, Shammon FY. 理论上提高并行度是可以缓解,但是并行度调整太大对成本要求可能会比较高。因为写入其实不需要占用太多的资源,只是窗口触发后数据量过大(用户的基数),每条数据合并的操作成本过高(一条数据的窗口聚合成本需要合并24*60/5=288次)。 现在我只能想到以下几种解决办法 1. 将窗口步长往上调,这个问题可以从根本上解决(步长过长意味着窗口触发的时间会延后) 2. 步长可以往上调,使用early-fire机制,将未计算完成的窗口直接下发(触发的数据可能不符合近24小时的业务含义,下游系统需要支持upsert) 3. 借助外部存储,flink直接同步或者预聚合的方式写入一个OLAP系统(譬如doris/ck),读时再聚合(需要一个稳定可靠的外部存储) 你这边用flink做滑动窗口的计算会遇到这样的问题吗?是否还有其他更好解决办法? 十分期待你的反馈 best, tanjialiang. 回复的原邮件 | 发件人 | Shammon FY | | 发送日期 | 2023年5月29日 09:08 | | 收件人 | | | 主题 | Re: FlinkSQL大窗口小步长的滑动窗口解决方案 | Hi, 这是窗口触发后发送的数据量过大吗?调大资源,加大窗口计算的并发度是否可以缓解这个问题? Best, Shammon FY On Fri, May 26, 2023 at 2:03 PM tanjialiang wrote: Hi, all. 我在使用FlinkSQL的window tvf滑动窗口时遇到一些问题。 滑动步长为5分钟,窗口为24小时,group by user_id的滑动窗口,当任务挂掉了或者从kafka的earliest-offset消费,checkpoint很难成功。 因为从earliest开始消费,数据很快就会堆满缓冲区产生背压,这时这一批数据可能会触发N次窗口计算往下游发,每次触发的操作成本是(用户基数 * 24 * 60 / 5),checkpoint barrier可能会一直卡住。 这时候有什么办法可以破局吗? best, tanjialiang.
Re: [ANNOUNCE] Apache Flink 1.16.2 released
Hi Jing, Thank you for caring about the releasing process. It has to be said that the entire process went smoothly. We have very comprehensive documentation[1] to guide my work, thanks to the contribution of previous release managers and the community. Regarding the obstacles, I actually only have one minor problem: We used an older twine(1.12.0) to deploy python artifacts to PyPI, and its compatible dependencies (such as urllib3) are also older. When I tried twine upload, the process couldn't work as expected as the version of urllib3 installed in my machine was relatively higher. In order to solve this, I had to proactively downgrade the version of some dependencies. I added a notice in the cwiki page[1] to prevent future release managers from encountering the same problem. It seems that this is a known issue(see comments in [2]), which has been resolved in the higher version of twine, I wonder if we can upgrade the version of twine? Does anyone remember the reason why we fixed a very old version(1.12.0)? Best regards, Weijie [1] https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release [2] https://github.com/pypa/twine/issues/997 Jing Ge 于2023年5月27日周六 00:15写道: > Hi Weijie, > > Thanks again for your effort. I was wondering if there were any obstacles > you had to overcome while releasing 1.16.2 and 1.17.1 that could lead us to > any improvement wrt the release process and management? > > Best regards, > Jing > > On Fri, May 26, 2023 at 4:41 PM Martijn Visser > wrote: > > > Thank you Weijie and those who helped with testing! > > > > On Fri, May 26, 2023 at 1:06 PM weijie guo > > wrote: > > > > > The Apache Flink community is very happy to announce the release of > > > Apache Flink 1.16.2, which is the second bugfix release for the Apache > > > Flink 1.16 series. > > > > > > > > > > > > Apache Flink® is an open-source stream processing framework for > > > distributed, high-performing, always-available, and accurate data > > > streaming applications. > > > > > > > > > > > > The release is available for download at: > > > > > > https://flink.apache.org/downloads.html > > > > > > > > > > > > Please check out the release blog post for an overview of the > > > improvements for this bugfix release: > > > > > > https://flink.apache.org/news/2023/05/25/release-1.16.2.html > > > > > > > > > > > > The full release notes are available in Jira: > > > > > > > > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352765 > > > > > > > > > > > > We would like to thank all contributors of the Apache Flink community > > > who made this release possible! > > > > > > > > > > > > Feel free to reach out to the release managers (or respond to this > > > thread) with feedback on the release process. Our goal is to > > > constantly improve the release process. Feedback on what could be > > > improved or things that didn't go so well are appreciated. > > > > > > > > > > > > Regards, > > > > > > Release Manager > > > > > >
flink 输出异常数据
各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导 | | 小昌同学 | | ccc0606fight...@163.com |
自定义trigger触发问题
自定义的trigger,实现满足maxcount或者到达窗口结束时间时输出结果; 问题:同一个窗口,在代码窗口结束时onProcessingTime会触发多次,理论上每个爽口只应该在到达窗口结束时间触发一次,是什么原因 主类代码片段: SingleOutputStreamOperator> windowMap = afterMap.timeWindowAll(Time.seconds(5)) .trigger(new CountAndProcessingTimeTrigger( 100)) .process(simpleConfig.getWindowFunction().newInstance()) 触发器代码: public class CountAndProcessingTimeTrigger extends Trigger { private static final long serialVersionUID = 1L; //窗口最大个数 private final long maxCount; private final ReducingStateDescriptor stateDesc; public CountAndProcessingTimeTrigger(long maxCount) { this.stateDesc = new ReducingStateDescriptor<>("count_time", new CountAndProcessingTimeTrigger.Sum(), LongSerializer.INSTANCE); this.maxCount = maxCount; } /** * 元素添加 * * @param o 元素 * @param timestamp timestamp * @param window window * @param triggerContext triggerContext * @return TriggerResult * CONTINUE:表示啥都不做。 * FIRE:表示触发计算,同时保留窗口中的数据 * PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。 * FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。 * @throws Exception Exception */ @Override public TriggerResult onElement(Object o, long timestamp, TimeWindow window, TriggerContext triggerContext) throws Exception { triggerContext.registerProcessingTimeTimer(window.maxTimestamp()); ReducingState countState = triggerContext.getPartitionedState(stateDesc); countState.add(1L); if (countState.get() >= maxCount) { log.info("countTrigger: {}", countState.get()); countState.clear(); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } /** * 窗口关闭 * * @param timestamp timestamp * @param window window * @param triggerContext triggerContext * @return TriggerResult * @throws Exception Exception */ @Override public TriggerResult onProcessingTime(long timestamp, TimeWindow window, TriggerContext triggerContext) throws Exception { ReducingState countState = triggerContext.getPartitionedState(stateDesc); log.info("timeTrigger: {}, currentProcessingTime:{}", countState.get(), window.maxTimestamp()); countState.clear(); return TriggerResult.FIRE_AND_PURGE; } @Override public TriggerResult onEventTime(long timestamp, TimeWindow window, TriggerContext triggerContext) throws Exception { return TriggerResult.CONTINUE; } @Override public boolean canMerge() { return false; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { ctx.mergePartitionedState(stateDesc); long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } /** * 窗口删除 * * @param window window * @param triggerContext triggerContext * @throws Exception Exception */ @Override public void clear(TimeWindow window, TriggerContext triggerContext) throws Exception { triggerContext.deleteProcessingTimeTimer(window.maxTimestamp()); triggerContext.getPartitionedState(stateDesc).clear(); } /** * 计数方法 */ private static class Sum implements ReduceFunction { private static final long serialVersionUID = 1L; private Sum() { } public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } } | | 吴先生 | | 15951914...@163.com |